-
-
Notifications
You must be signed in to change notification settings - Fork 198
/
Copy pathconsumer_group.go
2785 lines (2511 loc) · 90.4 KB
/
consumer_group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package kgo
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)
type groupConsumer struct {
c *consumer // used to change consumer state; generally c.mu is grabbed on access
cl *Client // used for running requests / adding to topics map
cfg *cfg
ctx context.Context
cancel func()
manageDone chan struct{} // closed once when the manage goroutine quits
cooperative atomicBool // true if the group balancer chosen during Join is cooperative
// The data for topics that the user assigned. Metadata updates the
// atomic.Value in each pointer atomically. If we are consuming via
// regex, metadata grabs the lock to add new topics.
tps *topicsPartitions
reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
// Full lock grabbed in CommitOffsetsSync, read lock grabbed in
// CommitOffsets, this lock ensures that only one sync commit can
// happen at once, and if it is happening, no other commit can be
// happening.
syncCommitMu sync.RWMutex
rejoinCh chan string // cap 1; sent to if subscription changes (regex)
// For EOS, before we commit, we force a heartbeat. If the client and
// group member are both configured properly, then the transactional
// timeout will be less than the session timeout. By forcing a
// heartbeat before the commit, if the heartbeat was successful, then
// we ensure that we will complete the transaction within the group
// session, meaning we will not commit after the group has rebalanced.
heartbeatForceCh chan func(error)
// The following two are only updated in the manager / join&sync loop
// The nowAssigned map is read when commits fail: if the commit fails
// with ILLEGAL_GENERATION and it contains only partitions that are in
// nowAssigned, we re-issue.
lastAssigned map[string][]int32
nowAssigned amtps
// Fetching ensures we continue fetching offsets across cooperative
// rebalance if an offset fetch returns early due to an immediate
// rebalance. See the large comment on adjustCooperativeFetchOffsets
// for more details.
//
// This is modified only in that function, or in the manage loop on a
// hard error once the heartbeat/fetch has returned.
fetching map[string]map[int32]struct{}
// onFetchedMu ensures we do not call onFetched nor adjustOffsets
// concurrent with onRevoked.
//
// The group session itself ensures that OnPartitions functions are
// serial, but offset fetching is concurrent with heartbeating and can
// finish before or after heartbeating has already detected a revoke.
// To make user lives easier, we guarantee that offset fetch callbacks
// cannot be concurrent with onRevoked with this mu. If fetch callbacks
// are present, we hook this mu into onRevoked, and we grab it in the
// locations fetch callbacks are called. We only have to worry about
// onRevoked because fetching offsets occurs after onAssigned, and
// onLost happens after fetching offsets is done.
onFetchedMu sync.Mutex
// leader is whether we are the leader right now. This is set to false
//
// - set to false at the beginning of a join group session
// - set to true if join group response indicates we are leader
// - read on metadata updates in findNewAssignments
leader atomicBool
// Set to true when ending a transaction committing transaction
// offsets, and then set to false immediately after before calling
// EndTransaction.
offsetsAddedToTxn bool
// If we are leader, then other members may express interest to consume
// topics that we are not interested in consuming. We track the entire
// group's topics in external, and our fetchMetadata loop uses this.
// We store this as a pointer for address comparisons.
external atomic.Value // *groupExternal
//////////////
// mu block //
//////////////
mu sync.Mutex
// using is updated when finding new assignments, we always add to this
// if we want to consume a topic (or see there are more potential
// partitions). Only the leader can trigger a new group session if there
// are simply more partitions for existing topics.
//
// This is read when joining a group or leaving a group.
using map[string]int // topics *we* are currently using => # partitions known in that topic
// uncommitted is read and updated all over:
// - updated before PollFetches returns
// - updated when directly setting offsets (to rewind, for transactions)
// - emptied when leaving a group
// - updated when revoking
// - updated after fetching offsets once we receive our group assignment
// - updated after we commit
// - read when getting uncommitted or committed
uncommitted uncommitted
// memberID and generation are written to in the join and sync loop,
// and mostly read within that loop. The reason these two are under the
// mutex is because they are read during commits, which can happen at
// any arbitrary moment. It is **recommended** to be done within the
// context of a group session, but (a) users may have some unique use
// cases, and (b) the onRevoke hook may take longer than a user
// expects, which would rotate a session.
memberID string
generation int32
// commitCancel and commitDone are set under mu before firing off an
// async commit request. If another commit happens, it cancels the
// prior commit, waits for the prior to be done, and then starts its
// own.
commitCancel func()
commitDone chan struct{}
// blockAuto is set and cleared in CommitOffsets{,Sync} to block
// autocommitting if autocommitting is active. This ensures that an
// autocommit does not cancel the user's manual commit.
blockAuto bool
// We set this once to manage the group lifecycle once.
managing bool
dying bool // set when closing, read in findNewAssignments
}
// LeaveGroup leaves a group if in one. Calling the client's Close function
// also leaves a group, so this is only necessary to call if you plan to leave
// the group and continue using the client.
//
// If you have overridden the default revoke, you must manually commit offsets
// before leaving the group.
//
// If you have configured the group with an InstanceID, this does not leave the
// group. With instance IDs, it is expected that clients will restart and
// re-use the same instance ID. To leave a group using an instance ID, you must
// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
// scripts or kcl).
func (cl *Client) LeaveGroup() {
c := &cl.consumer
if c.g == nil {
return
}
c.waitAndAddRebalance()
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup")
wait := c.g.leave()
c.mu.Unlock()
c.unaddRebalance()
wait() // wait after we unlock
}
// GroupMetadata returns the current group member ID and generation, or an
// empty string and -1 if not in the group.
func (cl *Client) GroupMetadata() (string, int32) {
g := cl.consumer.g
if g == nil {
return "", -1
}
g.mu.Lock()
defer g.mu.Unlock()
if g.memberID == "" {
return "", -1
}
return g.memberID, g.generation
}
func (c *consumer) initGroup() {
ctx, cancel := context.WithCancel(c.cl.ctx)
g := &groupConsumer{
c: c,
cl: c.cl,
cfg: &c.cl.cfg,
ctx: ctx,
cancel: cancel,
reSeen: make(map[string]bool),
manageDone: make(chan struct{}),
tps: newTopicsPartitions(),
rejoinCh: make(chan string, 1),
heartbeatForceCh: make(chan func(error)),
using: make(map[string]int),
}
c.g = g
if !g.cfg.setCommitCallback {
g.cfg.commitCallback = g.defaultCommitCallback
}
if g.cfg.txnID == nil {
// We only override revoked / lost if they were not explicitly
// set by options.
if !g.cfg.setRevoked {
g.cfg.onRevoked = g.defaultRevoke
}
// For onLost, we do not want to commit in onLost, so we
// explicitly set onLost to an empty function to avoid the
// fallback to onRevoked.
if !g.cfg.setLost {
g.cfg.onLost = func(context.Context, *Client, map[string][]int32) {}
}
} else {
g.cfg.autocommitDisable = true
}
for _, logOn := range []struct {
name string
set *func(context.Context, *Client, map[string][]int32)
}{
{"OnPartitionsAssigned", &g.cfg.onAssigned},
{"OnPartitionsRevoked", &g.cfg.onRevoked},
{"OnPartitionsLost", &g.cfg.onLost},
} {
user := *logOn.set
name := logOn.name
*logOn.set = func(ctx context.Context, cl *Client, m map[string][]int32) {
var ctxExpired bool
select {
case <-ctx.Done():
ctxExpired = true
default:
}
if ctxExpired {
cl.cfg.logger.Log(LogLevelDebug, "entering "+name, "with", m, "context_expired", ctxExpired)
} else {
cl.cfg.logger.Log(LogLevelDebug, "entering "+name, "with", m)
}
if user != nil {
dup := make(map[string][]int32)
for k, vs := range m {
dup[k] = append([]int32(nil), vs...)
}
user(ctx, cl, dup)
}
}
}
if g.cfg.onFetched != nil || g.cfg.adjustOffsetsBeforeAssign != nil {
revoked := g.cfg.onRevoked
g.cfg.onRevoked = func(ctx context.Context, cl *Client, m map[string][]int32) {
g.onFetchedMu.Lock()
defer g.onFetchedMu.Unlock()
revoked(ctx, cl, m)
}
}
// For non-regex topics, we explicitly ensure they exist for loading
// metadata. This is of no impact if we are *also* consuming via regex,
// but that is no problem.
if len(g.cfg.topics) > 0 && !g.cfg.regex {
topics := make([]string, 0, len(g.cfg.topics))
for topic := range g.cfg.topics {
topics = append(topics, topic)
}
g.tps.storeTopics(topics)
}
if !g.cfg.autocommitDisable && g.cfg.autocommitInterval > 0 {
g.cfg.logger.Log(LogLevelInfo, "beginning autocommit loop", "group", g.cfg.group)
go g.loopCommit()
}
}
// Manages the group consumer's join / sync / heartbeat / fetch offset flow.
//
// Once a group is assigned, we fire a metadata request for all topics the
// assignment specified interest in. Only after we finally have some topic
// metadata do we join the group, and once joined, this management runs in a
// dedicated goroutine until the group is left.
func (g *groupConsumer) manage() {
defer close(g.manageDone)
g.cfg.logger.Log(LogLevelInfo, "beginning to manage the group lifecycle", "group", g.cfg.group)
var consecutiveErrors int
joinWhy := "beginning to manage the group lifecycle"
for {
if joinWhy == "" {
joinWhy = "rejoining from normal rebalance"
}
err := g.joinAndSync(joinWhy)
if err == nil {
if joinWhy, err = g.setupAssignedAndHeartbeat(); err != nil {
if errors.Is(err, kerr.RebalanceInProgress) {
err = nil
}
}
}
if err == nil {
consecutiveErrors = 0
continue
}
joinWhy = "rejoining after we previously errored and backed off"
// If the user has BlockPollOnRebalance enabled, we have to
// block around the onLost and assigning.
g.c.waitAndAddRebalance()
if errors.Is(err, context.Canceled) && g.cfg.onRevoked != nil {
// The cooperative consumer does not revoke everything
// while rebalancing, meaning if our context is
// canceled, we may have uncommitted data. Rather than
// diving into onLost, we should go into onRevoked,
// because for the most part, a context cancelation
// means we are leaving the group. Going into onRevoked
// gives us an opportunity to commit outstanding
// offsets. For the eager consumer, since we always
// revoke before exiting the heartbeat loop, we do not
// really care so much about *needing* to call
// onRevoked, but since we are handling this case for
// the cooperative consumer we may as well just also
// include the eager consumer.
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read())
} else {
// Any other error is perceived as a fatal error,
// and we go into onLost as appropriate.
if g.cfg.onLost != nil {
g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned.read())
}
g.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookGroupManageError); ok {
h.OnGroupManageError(err)
}
})
g.c.addFakeReadyForDraining("", 0, &ErrGroupSession{err})
}
// If we are eager, we should have invalidated everything
// before getting here, but we do so doubly just in case.
//
// If we are cooperative, the join and sync could have failed
// during the cooperative rebalance where we were still
// consuming. We need to invalidate everything. Waiting to
// resume from poll is necessary, but the user will likely be
// unable to commit.
{
g.c.mu.Lock()
g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session")
g.mu.Lock() // before allowing poll to touch uncommitted, lock the group
g.c.mu.Unlock() // now part of poll can continue
g.uncommitted = nil
g.mu.Unlock()
g.nowAssigned.store(nil)
g.lastAssigned = nil
g.fetching = nil
g.leader.Store(false)
g.resetExternal()
}
// Unblock bolling now that we have called onLost and
// re-assigned.
g.c.unaddRebalance()
if errors.Is(err, context.Canceled) { // context was canceled, quit now
return
}
// Waiting for the backoff is a good time to update our
// metadata; maybe the error is from stale metadata.
consecutiveErrors++
backoff := g.cfg.retryBackoff(consecutiveErrors)
g.cfg.logger.Log(LogLevelError, "join and sync loop errored",
"group", g.cfg.group,
"err", err,
"consecutive_errors", consecutiveErrors,
"backoff", backoff,
)
deadline := time.Now().Add(backoff)
g.cl.waitmeta(g.ctx, backoff, "waitmeta during join & sync error backoff")
after := time.NewTimer(time.Until(deadline))
select {
case <-g.ctx.Done():
after.Stop()
return
case <-after.C:
}
}
}
func (g *groupConsumer) leave() (wait func()) {
// If g.using is nonzero before this check, then a manage goroutine has
// started. If not, it will never start because we set dying.
g.mu.Lock()
wasDead := g.dying
g.dying = true
wasManaging := g.managing
g.mu.Unlock()
done := make(chan struct{})
go func() {
defer close(done)
g.cancel()
if wasManaging {
// We want to wait for the manage goroutine to be done
// so that we call the user's on{Assign,RevokeLost}.
<-g.manageDone
}
if wasDead {
// If we already called leave(), then we just wait for
// the prior leave to finish and we avoid re-issuing a
// LeaveGroup request.
return
}
if g.cfg.instanceID == nil {
g.cfg.logger.Log(LogLevelInfo, "leaving group",
"group", g.cfg.group,
"member_id", g.memberID, // lock not needed now since nothing can change it (manageDone)
)
// If we error when leaving, there is not much
// we can do. We may as well just return.
req := kmsg.NewPtrLeaveGroupRequest()
req.Group = g.cfg.group
req.MemberID = g.memberID
member := kmsg.NewLeaveGroupRequestMember()
member.MemberID = g.memberID
member.Reason = kmsg.StringPtr("client leaving group per normal operation")
req.Members = append(req.Members, member)
req.RequestWith(g.cl.ctx, g.cl)
}
}()
return func() { <-done }
}
// returns the difference of g.nowAssigned and g.lastAssigned.
func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) {
nowAssigned := g.nowAssigned.clone()
if !g.cooperative.Load() {
return nowAssigned, nil
}
added = make(map[string][]int32, len(nowAssigned))
lost = make(map[string][]int32, len(nowAssigned))
// First, we diff lasts: any topic in last but not now is lost,
// otherwise, (1) new partitions are added, (2) common partitions are
// ignored, and (3) partitions no longer in now are lost.
lasts := make(map[int32]struct{}, 100)
for topic, lastPartitions := range g.lastAssigned {
nowPartitions, exists := nowAssigned[topic]
if !exists {
lost[topic] = lastPartitions
continue
}
for _, lastPartition := range lastPartitions {
lasts[lastPartition] = struct{}{}
}
// Anything now that does not exist in last is new,
// otherwise it is in common and we ignore it.
for _, nowPartition := range nowPartitions {
if _, exists := lasts[nowPartition]; !exists {
added[topic] = append(added[topic], nowPartition)
} else {
delete(lasts, nowPartition)
}
}
// Anything remanining in last does not exist now
// and is thus lost.
for last := range lasts {
lost[topic] = append(lost[topic], last)
delete(lasts, last) // reuse lasts
}
}
// Finally, any new topics in now assigned are strictly added.
for topic, nowPartitions := range nowAssigned {
if _, exists := g.lastAssigned[topic]; !exists {
added[topic] = nowPartitions
}
}
return added, lost
}
type revokeStage int8
const (
revokeLastSession = iota
revokeThisSession
)
// revoke calls onRevoked for partitions that this group member is losing and
// updates the uncommitted map after the revoke.
//
// For eager consumers, this simply revokes g.assigned. This will only be
// called at the end of a group session.
//
// For cooperative consumers, this either
//
// (1) if revoking lost partitions from a prior session (i.e., after sync),
// this revokes the passed in lost
// (2) if revoking at the end of a session, this revokes topics that the
// consumer is no longer interested in consuming
//
// Lastly, for cooperative consumers, this must selectively delete what was
// lost from the uncommitted map.
func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leaving bool) {
g.c.waitAndAddRebalance()
defer g.c.unaddRebalance()
if !g.cooperative.Load() || leaving { // stage == revokeThisSession if not cooperative
// If we are an eager consumer, we stop fetching all of our
// current partitions as we will be revoking them.
g.c.mu.Lock()
if leaving {
g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are leaving the group")
} else {
g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are not cooperative")
}
g.c.mu.Unlock()
if !g.cooperative.Load() {
g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read())
} else {
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned.read())
}
if g.cfg.onRevoked != nil {
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read())
}
g.nowAssigned.store(nil)
// After nilling uncommitted here, nothing should recreate
// uncommitted until a future fetch after the group is
// rejoined. This _can_ be broken with a manual SetOffsets or
// with CommitOffsets{,Sync} but we explicitly document not
// to do that outside the context of a live group session.
g.mu.Lock()
g.uncommitted = nil
g.mu.Unlock()
return
}
switch stage {
case revokeLastSession:
// we use lost in this case
case revokeThisSession:
// lost is nil for cooperative assigning. Instead, we determine
// lost by finding subscriptions we are no longer interested
// in. This would be from a user's PurgeConsumeTopics call.
//
// We just paused metadata, but purging triggers a rebalance
// which causes a new metadata request -- in short, this could
// be concurrent with a metadata findNewAssignments, so we
// lock.
g.nowAssigned.write(func(nowAssigned map[string][]int32) {
g.mu.Lock()
for topic, partitions := range nowAssigned {
if _, exists := g.using[topic]; !exists {
if lost == nil {
lost = make(map[string][]int32)
}
lost[topic] = partitions
delete(nowAssigned, topic)
}
}
g.mu.Unlock()
})
}
if len(lost) > 0 {
// We must now stop fetching anything we lost and invalidate
// any buffered fetches before falling into onRevoked.
//
// We want to invalidate buffered fetches since they may
// contain partitions that we lost, and we do not want a future
// poll to return those fetches.
lostOffsets := make(map[string]map[int32]Offset, len(lost))
for lostTopic, lostPartitions := range lost {
lostPartitionOffsets := make(map[int32]Offset, len(lostPartitions))
for _, lostPartition := range lostPartitions {
lostPartitionOffsets[lostPartition] = Offset{}
}
lostOffsets[lostTopic] = lostPartitionOffsets
}
// We must invalidate before revoking and before updating
// uncommitted, because we want any commits in onRevoke to be
// for the final polled offsets. We do not want to allow the
// logical race of allowing fetches for revoked partitions
// after a revoke but before an invalidation.
g.c.mu.Lock()
g.c.assignPartitions(lostOffsets, assignInvalidateMatching, g.tps, "revoking assignments from cooperative consuming")
g.c.mu.Unlock()
}
if len(lost) > 0 || stage == revokeThisSession {
if len(lost) == 0 {
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke at the end of a session even though no partitions were lost", "group", g.cfg.group)
} else {
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke", "group", g.cfg.group, "lost", lost, "stage", stage)
}
if g.cfg.onRevoked != nil {
g.cfg.onRevoked(g.cl.ctx, g.cl, lost)
}
}
if len(lost) == 0 { // if we lost nothing, do nothing
return
}
if stage != revokeThisSession { // cooperative consumers rejoin after they revoking what they lost
defer g.rejoin("cooperative rejoin after revoking what we lost from a rebalance")
}
// The block below deletes everything lost from our uncommitted map.
// All commits should be **completed** by the time this runs. An async
// commit can undo what we do below. The default revoke runs a sync
// commit.
g.mu.Lock()
defer g.mu.Unlock()
if g.uncommitted == nil {
return
}
for lostTopic, lostPartitions := range lost {
uncommittedPartitions := g.uncommitted[lostTopic]
if uncommittedPartitions == nil {
continue
}
for _, lostPartition := range lostPartitions {
delete(uncommittedPartitions, lostPartition)
}
if len(uncommittedPartitions) == 0 {
delete(g.uncommitted, lostTopic)
}
}
if len(g.uncommitted) == 0 {
g.uncommitted = nil
}
}
// assignRevokeSession aids in sequencing prerevoke/assign/revoke.
type assignRevokeSession struct {
prerevokeDone chan struct{}
assignDone chan struct{}
revokeDone chan struct{}
}
func newAssignRevokeSession() *assignRevokeSession {
return &assignRevokeSession{
prerevokeDone: make(chan struct{}),
assignDone: make(chan struct{}),
revokeDone: make(chan struct{}),
}
}
// For cooperative consumers, the first thing a cooperative consumer does is to
// diff its last assignment and its new assignment and revoke anything lost.
// We call this a "prerevoke".
func (s *assignRevokeSession) prerevoke(g *groupConsumer, lost map[string][]int32) <-chan struct{} {
go func() {
defer close(s.prerevokeDone)
if g.cooperative.Load() && len(lost) > 0 {
g.revoke(revokeLastSession, lost, false)
}
}()
return s.prerevokeDone
}
func (s *assignRevokeSession) assign(g *groupConsumer, newAssigned map[string][]int32) <-chan struct{} {
go func() {
defer close(s.assignDone)
<-s.prerevokeDone
if g.cfg.onAssigned != nil {
// We always call on assigned, even if nothing new is
// assigned. This allows consumers to know that
// assignment is done and do setup logic.
//
// If configured, we have to block polling.
g.c.waitAndAddRebalance()
defer g.c.unaddRebalance()
g.cfg.onAssigned(g.cl.ctx, g.cl, newAssigned)
}
}()
return s.assignDone
}
// At the end of a group session, before we leave the heartbeat loop, we call
// revoke. For non-cooperative consumers, this revokes everything in the
// current session, and before revoking, we invalidate all partitions. For the
// cooperative consumer, this does nothing but does notify the client that a
// revoke has begun / the group session is ending.
//
// This may not run before returning from the heartbeat loop: if we encounter a
// fatal error, we return before revoking so that we can instead call onLost in
// the manage loop.
func (s *assignRevokeSession) revoke(g *groupConsumer, leaving bool) <-chan struct{} {
go func() {
defer close(s.revokeDone)
<-s.assignDone
g.revoke(revokeThisSession, nil, leaving)
}()
return s.revokeDone
}
// This chunk of code "pre" revokes lost partitions for the cooperative
// consumer and then begins heartbeating while fetching offsets. This returns
// when heartbeating errors (or if fetch offsets errors).
//
// Before returning, this function ensures that
// - onAssigned is complete
// - which ensures that pre revoking is complete
// - fetching is complete
// - heartbeating is complete
func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {
type hbquit struct {
rejoinWhy string
err error
}
hbErrCh := make(chan hbquit, 1)
fetchErrCh := make(chan error, 1)
s := newAssignRevokeSession()
added, lost := g.diffAssigned()
g.lastAssigned = g.nowAssigned.clone() // now that we are done with our last assignment, update it per the new assignment
g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", mtps(added), "lost", mtps(lost))
s.prerevoke(g, lost) // for cooperative consumers
// Since we have joined the group, we immediately begin heartbeating.
// This will continue until the heartbeat errors, the group is killed,
// or the fetch offsets below errors.
ctx, cancel := context.WithCancel(g.ctx)
go func() {
defer cancel() // potentially kill offset fetching
g.cfg.logger.Log(LogLevelInfo, "beginning heartbeat loop", "group", g.cfg.group)
rejoinWhy, err := g.heartbeat(fetchErrCh, s)
hbErrCh <- hbquit{rejoinWhy, err}
}()
// We immediately begin fetching offsets. We want to wait until the
// fetch function returns, since it assumes within it that another
// assign cannot happen (it assigns partitions itself). Returning
// before the fetch completes would be not good.
//
// The difference between fetchDone and fetchErrCh is that fetchErrCh
// can kill heartbeating, or signal it to continue, while fetchDone
// is specifically used for this function's return.
fetchDone := make(chan struct{})
defer func() { <-fetchDone }()
// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets.
if g.cooperative.Load() {
added = g.adjustCooperativeFetchOffsets(added, lost)
}
// Before we fetch offsets, we wait for the user's onAssign callback to
// be done. This ensures a few things:
//
// * that we wait for for prerevoking to be done, which updates the
// uncommitted field. Waiting for that ensures that a rejoin and poll
// does not have weird concurrent interaction.
//
// * that our onLost will not be concurrent with onAssign
//
// * that the user can start up any per-partition processors necessary
// before we begin consuming that partition.
//
// We especially need to wait here because heartbeating may not
// necessarily run onRevoke before returning (because of a fatal
// error).
s.assign(g, added)
<-s.assignDone
if len(added) > 0 {
go func() {
defer close(fetchDone)
defer close(fetchErrCh)
fetchErrCh <- g.fetchOffsets(ctx, added)
}()
} else {
close(fetchDone)
close(fetchErrCh)
}
// Finally, we simply return whatever the heartbeat error is. This will
// be the fetch offset error if that function is what killed this.
done := <-hbErrCh
return done.rejoinWhy, done.err
}
// heartbeat issues heartbeat requests to Kafka for the duration of a group
// session.
//
// This function begins before fetching offsets to allow the consumer's
// onAssigned to be called before fetching. If the eventual offset fetch
// errors, we continue heartbeating until onRevoked finishes and our metadata
// is updated. If the error is not RebalanceInProgress, we return immediately.
//
// If the offset fetch is successful, then we basically sit in this function
// until a heartbeat errors or we, being the leader, decide to re-join.
func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSession) (string, error) {
ticker := time.NewTicker(g.cfg.heartbeatInterval)
defer ticker.Stop()
// We issue one heartbeat quickly if we are cooperative because
// cooperative consumers rejoin the group immediately, and we want to
// detect that in 500ms rather than 3s.
var cooperativeFastCheck <-chan time.Time
if g.cooperative.Load() {
cooperativeFastCheck = time.After(500 * time.Millisecond)
}
var metadone, revoked <-chan struct{}
var heartbeat, didMetadone, didRevoke bool
var rejoinWhy string
var lastErr error
ctxCh := g.ctx.Done()
for {
var err error
var force func(error)
heartbeat = false
select {
case <-cooperativeFastCheck:
heartbeat = true
case <-ticker.C:
heartbeat = true
case force = <-g.heartbeatForceCh:
heartbeat = true
case rejoinWhy = <-g.rejoinCh:
// If a metadata update changes our subscription,
// we just pretend we are rebalancing.
g.cfg.logger.Log(LogLevelInfo, "forced rejoin quitting heartbeat loop", "why", rejoinWhy)
err = kerr.RebalanceInProgress
case err = <-fetchErrCh:
fetchErrCh = nil
case <-metadone:
metadone = nil
didMetadone = true
case <-revoked:
revoked = nil
didRevoke = true
case <-ctxCh:
// Even if the group is left, we need to wait for our
// revoke to finish before returning, otherwise the
// manage goroutine will race with us setting
// nowAssigned.
ctxCh = nil
err = context.Canceled
}
if heartbeat {
g.cfg.logger.Log(LogLevelDebug, "heartbeating", "group", g.cfg.group)
req := kmsg.NewPtrHeartbeatRequest()
req.Group = g.cfg.group
req.Generation = g.generation
req.MemberID = g.memberID
req.InstanceID = g.cfg.instanceID
var resp *kmsg.HeartbeatResponse
if resp, err = req.RequestWith(g.ctx, g.cl); err == nil {
err = kerr.ErrorForCode(resp.ErrorCode)
}
g.cfg.logger.Log(LogLevelDebug, "heartbeat complete", "group", g.cfg.group, "err", err)
if force != nil {
force(err)
}
}
// The first error either triggers a clean revoke and metadata
// update or it returns immediately. If we triggered the
// revoke, we wait for it to complete regardless of any future
// error.
if didMetadone && didRevoke {
return rejoinWhy, lastErr
}
if err == nil {
continue
}
if lastErr == nil {
g.cfg.logger.Log(LogLevelInfo, "heartbeat errored", "group", g.cfg.group, "err", err)
} else {
g.cfg.logger.Log(LogLevelInfo, "heartbeat errored again while waiting for user revoke to finish", "group", g.cfg.group, "err", err)
}
// Since we errored, we must revoke.
if !didRevoke && revoked == nil {
// If our error is not from rebalancing, then we
// encountered IllegalGeneration or UnknownMemberID or
// our context closed all of which are unexpected and
// unrecoverable.
//
// We return early rather than revoking and updating
// metadata; the groupConsumer's manage function will
// call onLost with all partitions.
//
// setupAssignedAndHeartbeat still waits for onAssigned
// to be done so that we avoid calling onLost
// concurrently.
if !errors.Is(err, kerr.RebalanceInProgress) && revoked == nil {
return "", err
}
// Now we call the user provided revoke callback, even
// if cooperative: if cooperative, this only revokes
// partitions we no longer want to consume.
//
// If the err is context.Canceled, the group is being
// left and we revoke everything.
revoked = s.revoke(g, errors.Is(err, context.Canceled))
}
// Since we errored, while waiting for the revoke to finish, we
// update our metadata. A leader may have re-joined with new
// metadata, and we want the update.
if !didMetadone && metadone == nil {
waited := make(chan struct{})
metadone = waited
go func() {
g.cl.waitmeta(g.ctx, g.cfg.sessionTimeout, "waitmeta after heartbeat error")
close(waited)
}()
}
// We always save the latest error; generally this should be
// REBALANCE_IN_PROGRESS, but if the revoke takes too long,
// Kafka may boot us and we will get a different error.
lastErr = err
}
}
// ForceRebalance quits a group member's heartbeat loop so that the member
// rejoins with a JoinGroupRequest.
//
// This function is only useful if you either (a) know that the group member is
// a leader, and want to force a rebalance for any particular reason, or (b)
// are using a custom group balancer, and have changed the metadata that will
// be returned from its JoinGroupMetadata method. This function has no other
// use; see KIP-568 for more details around this function's motivation.
//
// If neither of the cases above are true (this member is not a leader, and the
// join group metadata has not changed), then Kafka will not actually trigger a
// rebalance and will instead reply to the member with its current assignment.
func (cl *Client) ForceRebalance() {
if g := cl.consumer.g; g != nil {
g.rejoin("rejoin from ForceRebalance")
}
}
// rejoin is called after a cooperative member revokes what it lost at the
// beginning of a session, or if we are leader and detect new partitions to
// consume.
func (g *groupConsumer) rejoin(why string) {
select {
case g.rejoinCh <- why:
default:
}
}
// Joins and then syncs, issuing the two slow requests in goroutines to allow
// for group cancelation to return early.
func (g *groupConsumer) joinAndSync(joinWhy string) error {
g.cfg.logger.Log(LogLevelInfo, "joining group", "group", g.cfg.group)
g.leader.Store(false)
g.getAndResetExternalRejoin()
defer func() {
// If we are not leader, we clear any tracking of external
// topics from when we were previously leader, since tracking
// these is just a waste.