Skip to content

Commit

Permalink
Use kgo.BlockRebalanceOnPoll and AllowRebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
JacobSMoller committed Mar 1, 2022
1 parent 75ac0dd commit e52f7ae
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ var (
)

func (pc *pconsumer) consume(topic string, partition int32, cl *kgo.Client) {
rand.Seed(time.Now().Unix())
fmt.Printf("Starting consume for t %s p %d\n", topic, partition)
defer fmt.Printf("Killing consume for t %s p %d\n", topic, partition)
for {
select {
case <-pc.quit:
pc.done <- struct{}{}
fmt.Printf("Closing consume for t %s p %d\n", topic, partition)
return
case recs := <-pc.recs:
// Mimick work to happen before committing records
Expand Down Expand Up @@ -81,12 +80,14 @@ func (s *splitConsume) lost(_ context.Context, cl *kgo.Client, lost map[string][
delete(s.consumers, topic)
}
close(pc.quit)
fmt.Printf("Waiting for work to finish t %s p %d\n", topic, partition)
<-pc.done
}
}
}

func main() {
rand.Seed(time.Now().Unix())
flag.Parse()

if len(*group) == 0 {
Expand All @@ -110,6 +111,7 @@ func main() {
kgo.OnPartitionsRevoked(s.lost),
kgo.OnPartitionsLost(s.lost),
kgo.DisableAutoCommit(),
kgo.BlockRebalanceOnPoll(),
}

cl, err := kgo.NewClient(opts...)
Expand Down Expand Up @@ -152,5 +154,8 @@ func (s *splitConsume) poll(cl *kgo.Client) {
}
})
})
s.mu.Lock()
cl.AllowRebalance()
s.mu.Unlock()
}
}

0 comments on commit e52f7ae

Please sign in to comment.