diff --git a/kafka/chandle.go b/kafka/chandle.go new file mode 100644 index 0000000..bc0158b --- /dev/null +++ b/kafka/chandle.go @@ -0,0 +1,17 @@ +package kafka + +import ( + "errors" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func ConsumerHandle(group *ConsumerGroup) (*kafka.Consumer, error) { + if group.c == nil { + return nil, errors.New("consumer handle not initialized") + } + kc, ok := group.c.(*kafka.Consumer) + if !ok { + return nil, errors.New("could not assert to *kafka.Consumer") + } + return kc, nil +} diff --git a/kafka/consumer.go b/kafka/consumer.go index fe2cee3..9b50165 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -17,6 +17,7 @@ type ConsumerGroup struct { Logger ziggurat.StructuredLogger GroupConfig ConsumerConfig wg *sync.WaitGroup + c confluentConsumer consumerMakeFunc func(*kafka.ConfigMap, []string) confluentConsumer } @@ -34,6 +35,7 @@ func (cg *ConsumerGroup) Consume(ctx context.Context, handler ziggurat.Handler) cm := cg.GroupConfig.toConfigMap() confCons := cg.consumerMakeFunc(&cm, cg.GroupConfig.Topics) + cg.c = confCons for i := 0; i < consConf.ConsumerCount; i++ { workerID := fmt.Sprintf("%s_%d", groupID, i) w := worker{