From 3466977b9c1f18c5837bed4441e0d1a763887056 Mon Sep 17 00:00:00 2001 From: Igor Guedes Rodrigues Date: Fri, 3 Jan 2025 07:32:33 -0300 Subject: [PATCH] prevent stuck --- internal/pkg/kafka/console_consumer.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/internal/pkg/kafka/console_consumer.go b/internal/pkg/kafka/console_consumer.go index d682ada..e3e4329 100644 --- a/internal/pkg/kafka/console_consumer.go +++ b/internal/pkg/kafka/console_consumer.go @@ -41,23 +41,30 @@ func ConsumeMessage(conn *KafkaConnection, topic string, group string, verbose b client, err := sarama.NewConsumerGroup(conn.Context.BootstrapServers, group, cgConfig) util.CheckErr(err) + return err wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() + defer close(consumer.ready) + defer cancel() for { err := client.Consume(ctx, []string{topic}, &consumer) util.CheckErr(err) - if ctx.Err() != nil { + if ctx.Err() != nil || err != nil { break } - consumer.ready = make(chan bool) + } }() - <-consumer.ready + select { + case <-consumer.ready: + case <-ctx.Done(): + return nil + } log.Println("Consumer running, waiting for events...") sigterm := make(chan os.Signal, 1)