Skip to content

Commit

Permalink
apply round robin on bach (#809)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard authored Sep 12, 2024
1 parent 9f274af commit 3314f06
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions workers/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type KafkaProducer struct {
kafkaConnected bool
compressCodec compress.Codec
kafkaConns map[int]*kafka.Conn // Map to store connections by partition
lastPartitionIndex *int
}

func NewKafkaProducer(config *pkgconfig.Config, logger *logger.Logger, name string) *KafkaProducer {
Expand Down Expand Up @@ -218,25 +219,24 @@ func (w *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) {
// add support for msg compression and round robin
var err error
if partition == nil {
index := 0
if w.lastPartitionIndex == nil {
w.lastPartitionIndex = new(int) // Initialiser l'index la première fois
}
numPartitions := len(w.kafkaConns)
for _, msg := range msgs {
conn := w.kafkaConns[index]
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
_, err = conn.WriteMessages(msg)
} else {
_, err = conn.WriteCompressedMessages(w.compressCodec, msg)
}
if err != nil {
w.LogError("unable to write message", err.Error())
w.kafkaConnected = false
<-w.kafkaReconnect
break
}

// Move to the next partition in round-robin fashion
index = (index + 1) % numPartitions
conn := w.kafkaConns[*w.lastPartitionIndex]
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
_, err = conn.WriteMessages(msgs...)
} else {
_, err = conn.WriteCompressedMessages(w.compressCodec, msgs...)
}
if err != nil {
w.LogError("unable to write message", err.Error())
w.kafkaConnected = false
<-w.kafkaReconnect
}

// Move to the next partition in round-robin fashion
*w.lastPartitionIndex = (*w.lastPartitionIndex + 1) % numPartitions
} else {
conn := w.kafkaConns[*partition]
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
Expand Down

0 comments on commit 3314f06

Please sign in to comment.