Skip to content

Commit

Permalink
consumer: fix potential slowReloads problem
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Apr 16, 2021
1 parent 111f922 commit 89bee85
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,19 +645,19 @@ func (l *listOrEpochLoads) addLoad(t string, p int32, loadType listOrEpochLoadTy

func (l *listOrEpochLoads) removeLoad(t string, p int32) {
for _, m := range []*offsetLoadMap{
&l.list,
&l.epoch,
l.list,
l.epoch,
} {
if *m == nil {
if m == nil {
continue
}
ps := (*m)[t]
ps := m[t]
if ps == nil {
continue
}
delete(ps, p)
if len(ps) == 0 {
delete(*m, t)
delete(m, t)
}
}
}
Expand Down Expand Up @@ -1052,17 +1052,28 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) {
}
s.listOrEpochMu.Unlock()

// We now add our immediate reloads back to the session. We are
// still in the context of the live session itself because this
// handling function is run with a session worker.
reloads.loadWithSession(s)
if !slowReloads.isEmpty() {
s.incWorker()
go func() {
// Before we dec our worker, we must add the slow
// reloads back into the session's waiting loads.
// Doing so allows a concurrent stopSession to
// track the waiting loads, whereas if we did not
// add things back to the session, we could abandon
// loading these offsets and have a stuck cursor.
defer s.decWorker()
defer slowReloads.loadWithSession(s)
after := time.NewTimer(time.Second)
defer after.Stop()
select {
case <-after.C:
case <-s.ctx.Done():
return
}
slowReloads.loadWithSession(s)
}()
}
}()
Expand Down

0 comments on commit 89bee85

Please sign in to comment.