From dab18bb6c101fe636a6fcefe468c3f34623fdd1d Mon Sep 17 00:00:00 2001 From: Shubhang Balkundi Date: Mon, 25 Mar 2024 13:20:12 +0530 Subject: [PATCH] adds a function to get consumer handle --- kafka/chandle.go | 17 +++++++++++++++++ kafka/consumer.go | 2 ++ 2 files changed, 19 insertions(+) create mode 100644 kafka/chandle.go diff --git a/kafka/chandle.go b/kafka/chandle.go new file mode 100644 index 0000000..bc0158b --- /dev/null +++ b/kafka/chandle.go @@ -0,0 +1,17 @@ +package kafka + +import ( + "errors" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func ConsumerHandle(group *ConsumerGroup) (*kafka.Consumer, error) { + if group.c == nil { + return nil, errors.New("consumer handle not initialized") + } + kc, ok := group.c.(*kafka.Consumer) + if !ok { + return nil, errors.New("could not assert to *kafka.Consumer") + } + return kc, nil +} diff --git a/kafka/consumer.go b/kafka/consumer.go index fe2cee3..9b50165 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -17,6 +17,7 @@ type ConsumerGroup struct { Logger ziggurat.StructuredLogger GroupConfig ConsumerConfig wg *sync.WaitGroup + c confluentConsumer consumerMakeFunc func(*kafka.ConfigMap, []string) confluentConsumer } @@ -34,6 +35,7 @@ func (cg *ConsumerGroup) Consume(ctx context.Context, handler ziggurat.Handler) cm := cg.GroupConfig.toConfigMap() confCons := cg.consumerMakeFunc(&cm, cg.GroupConfig.Topics) + cg.c = confCons for i := 0; i < consConf.ConsumerCount; i++ { workerID := fmt.Sprintf("%s_%d", groupID, i) w := worker{