Skip to content

Commit

Permalink
adds a function to get consumer handle
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhang Balkundi committed Mar 25, 2024
1 parent 076f7f1 commit dab18bb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
17 changes: 17 additions & 0 deletions kafka/chandle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kafka

import (
"errors"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func ConsumerHandle(group *ConsumerGroup) (*kafka.Consumer, error) {
if group.c == nil {
return nil, errors.New("consumer handle not initialized")
}
kc, ok := group.c.(*kafka.Consumer)
if !ok {
return nil, errors.New("could not assert to *kafka.Consumer")
}
return kc, nil
}
2 changes: 2 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ConsumerGroup struct {
Logger ziggurat.StructuredLogger
GroupConfig ConsumerConfig
wg *sync.WaitGroup
c confluentConsumer
consumerMakeFunc func(*kafka.ConfigMap, []string) confluentConsumer
}

Expand All @@ -34,6 +35,7 @@ func (cg *ConsumerGroup) Consume(ctx context.Context, handler ziggurat.Handler)

cm := cg.GroupConfig.toConfigMap()
confCons := cg.consumerMakeFunc(&cm, cg.GroupConfig.Topics)
cg.c = confCons
for i := 0; i < consConf.ConsumerCount; i++ {
workerID := fmt.Sprintf("%s_%d", groupID, i)
w := worker{
Expand Down

0 comments on commit dab18bb

Please sign in to comment.