diff --git a/examples/goroutine_per_partition_consuming_manual_commit/main.go b/examples/goroutine_per_partition_consuming_manual_commit/main.go index 2c3f5ed4..f700069e 100644 --- a/examples/goroutine_per_partition_consuming_manual_commit/main.go +++ b/examples/goroutine_per_partition_consuming_manual_commit/main.go @@ -25,13 +25,12 @@ var ( ) func (pc *pconsumer) consume(topic string, partition int32, cl *kgo.Client) { - rand.Seed(time.Now().Unix()) fmt.Printf("Starting consume for t %s p %d\n", topic, partition) - defer fmt.Printf("Killing consume for t %s p %d\n", topic, partition) for { select { case <-pc.quit: pc.done <- struct{}{} + fmt.Printf("Closing consume for t %s p %d\n", topic, partition) return case recs := <-pc.recs: // Mimick work to happen before committing records @@ -81,12 +80,14 @@ func (s *splitConsume) lost(_ context.Context, cl *kgo.Client, lost map[string][ delete(s.consumers, topic) } close(pc.quit) + fmt.Printf("Waiting for work to finish t %s p %d\n", topic, partition) <-pc.done } } } func main() { + rand.Seed(time.Now().Unix()) flag.Parse() if len(*group) == 0 { @@ -110,6 +111,7 @@ func main() { kgo.OnPartitionsRevoked(s.lost), kgo.OnPartitionsLost(s.lost), kgo.DisableAutoCommit(), + kgo.BlockRebalanceOnPoll(), } cl, err := kgo.NewClient(opts...) @@ -152,5 +154,8 @@ func (s *splitConsume) poll(cl *kgo.Client) { } }) }) + s.mu.Lock() + cl.AllowRebalance() + s.mu.Unlock() } }