diff --git a/workers/kafkaproducer.go b/workers/kafkaproducer.go index 35870cea..a08ed774 100644 --- a/workers/kafkaproducer.go +++ b/workers/kafkaproducer.go @@ -26,6 +26,7 @@ type KafkaProducer struct { kafkaConnected bool compressCodec compress.Codec kafkaConns map[int]*kafka.Conn // Map to store connections by partition + lastPartitionIndex *int } func NewKafkaProducer(config *pkgconfig.Config, logger *logger.Logger, name string) *KafkaProducer { @@ -218,25 +219,24 @@ func (w *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) { // add support for msg compression and round robin var err error if partition == nil { - index := 0 + if w.lastPartitionIndex == nil { + w.lastPartitionIndex = new(int) // Initialiser l'index la première fois + } numPartitions := len(w.kafkaConns) - for _, msg := range msgs { - conn := w.kafkaConns[index] - if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone { - _, err = conn.WriteMessages(msg) - } else { - _, err = conn.WriteCompressedMessages(w.compressCodec, msg) - } - if err != nil { - w.LogError("unable to write message", err.Error()) - w.kafkaConnected = false - <-w.kafkaReconnect - break - } - - // Move to the next partition in round-robin fashion - index = (index + 1) % numPartitions + conn := w.kafkaConns[*w.lastPartitionIndex] + if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone { + _, err = conn.WriteMessages(msgs...) + } else { + _, err = conn.WriteCompressedMessages(w.compressCodec, msgs...) } + if err != nil { + w.LogError("unable to write message", err.Error()) + w.kafkaConnected = false + <-w.kafkaReconnect + } + + // Move to the next partition in round-robin fashion + *w.lastPartitionIndex = (*w.lastPartitionIndex + 1) % numPartitions } else { conn := w.kafkaConns[*partition] if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {