Skip to content

Commit

Permalink
support round robin partitionner for kafka (#759)
Browse files Browse the repository at this point in the history
* support round robin partitionner for kafka
* some fixes
  • Loading branch information
dmachard authored Jul 1, 2024
1 parent 8caba0e commit 131a4e9
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 130 deletions.
25 changes: 0 additions & 25 deletions docs/_integration/elasticsearch/config.deprecated.yml

This file was deleted.

33 changes: 0 additions & 33 deletions docs/_integration/fluentd/config.deprecated.yml

This file was deleted.

36 changes: 0 additions & 36 deletions docs/_integration/kafka/config.deprecated.yml

This file was deleted.

4 changes: 2 additions & 2 deletions docs/_integration/kafka/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pipelines:

- name: kafka
kafkaproducer:
remote-address: 127.0.0.1
remote-address: 192.168.1.16
remote-port: 9092
connect-timeout: 5
retry-interval: 10
Expand All @@ -29,6 +29,6 @@ pipelines:
mode: flat-json
buffer-size: 100
topic: "dnscollector"
partition: 0
partition: null
chan-buffer-size: 4096
compression: none
2 changes: 1 addition & 1 deletion docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ You will find below some examples of configurations to manage your DNS logs.

- Pipelines running mode
- [x] [Advanced example with DNSmessage collector](./_examples/use-case-24.yml)
- [x] [How to only log slow responses and errors only?](./_examples/use-case-25.yml)
- [x] [How can I log only slow responses and errors?"](./_examples/use-case-25.yml)

- Capture DNS traffic from incoming DNSTap streams
- [x] [Read from UNIX DNSTap socket and forward it to TLS stream](./_examples/use-case-5.yml)
Expand Down
5 changes: 3 additions & 2 deletions docs/loggers/logger_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ Options:
> output text format, please refer to the default text format to see all available [directives](../configuration.md#custom-text-format), use this parameter if you want a specific format
* `buffer-size` (integer)
> Specifies the size of the buffer for DNS messages before they are sent to Kafka.
> Specifies the size of the bulk for DNS messages before they are sent to Kafka.
* `topic` (integer)
> Specifies the Kafka topic to which messages will be forwarded.
* `partition` (integer)
> Specifies the Kafka partition to which messages will be sent.
> If partition parameter is null, then use `round-robin` partitioner for kafka (default behavior)
* `chan-buffer-size` (int)
> Specifies the maximum number of packets that can be buffered before discard additional packets.
Expand Down Expand Up @@ -95,7 +96,7 @@ kafkaproducer:
text-format: ""
buffer-size: 100
topic: "dnscollector"
partition: 0
partition: null
chan-buffer-size: 0
compression: none
```
2 changes: 1 addition & 1 deletion pkgconfig/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ type ConfigLoggers struct {
FlushInterval int `yaml:"flush-interval" default:"10"`
ConnectTimeout int `yaml:"connect-timeout" default:"5"`
Topic string `yaml:"topic" default:"dnscollector"`
Partition int `yaml:"partition" default:"0"`
Partition *int `yaml:"partition" default:"nil"`
ChannelBufferSize int `yaml:"chan-buffer-size" default:"0"`
Compression string `yaml:"compression" default:"none"`
} `yaml:"kafkaproducer"`
Expand Down
116 changes: 86 additions & 30 deletions workers/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ import (
type KafkaProducer struct {
*GenericWorker
textFormat []string
kafkaConn *kafka.Conn
kafkaReady, kafkaReconnect chan bool
kafkaConnected bool
compressCodec compress.Codec
kafkaConns map[int]*kafka.Conn // Map to store connections by partition
}

func NewKafkaProducer(config *pkgconfig.Config, logger *logger.Logger, name string) *KafkaProducer {
bufSize := config.Global.Worker.ChannelBufferSize
if config.Loggers.KafkaProducer.ChannelBufferSize > 0 {
bufSize = config.Loggers.KafkaProducer.ChannelBufferSize
}
w := &KafkaProducer{GenericWorker: NewGenericWorker(config, logger, name, "kafka", bufSize, pkgconfig.DefaultMonitor)}
w.kafkaReady = make(chan bool)
w.kafkaReconnect = make(chan bool)
w := &KafkaProducer{
GenericWorker: NewGenericWorker(config, logger, name, "kafka", bufSize, pkgconfig.DefaultMonitor),
kafkaReady: make(chan bool),
kafkaReconnect: make(chan bool),
kafkaConns: make(map[int]*kafka.Conn),
}
w.ReadConfig()
return w
}
Expand Down Expand Up @@ -66,26 +69,33 @@ func (w *KafkaProducer) ReadConfig() {
}

func (w *KafkaProducer) Disconnect() {
if w.kafkaConn != nil {
w.LogInfo("closing connection")
w.kafkaConn.Close()
// Close all Kafka connections
for _, conn := range w.kafkaConns {
if conn != nil {
w.LogInfo("closing connection per partition")
conn.Close()
}
}
w.kafkaConns = make(map[int]*kafka.Conn) // Clear the map
}

func (w *KafkaProducer) ConnectToKafka(ctx context.Context, readyTimer *time.Timer) {
for {
readyTimer.Reset(time.Duration(10) * time.Second)

if w.kafkaConn != nil {
w.kafkaConn.Close()
w.kafkaConn = nil
if len(w.kafkaConns) > 0 {
w.Disconnect()
}

topic := w.GetConfig().Loggers.KafkaProducer.Topic
partition := w.GetConfig().Loggers.KafkaProducer.Partition
address := w.GetConfig().Loggers.KafkaProducer.RemoteAddress + ":" + strconv.Itoa(w.GetConfig().Loggers.KafkaProducer.RemotePort)

w.LogInfo("connecting to kafka=%s partition=%d topic=%s", address, partition, topic)
if partition == nil {
w.LogInfo("connecting to kafka=%s partition=all topic=%s", address, topic)
} else {
w.LogInfo("connecting to kafka=%s partition=%d topic=%s", address, *partition, topic)
}

dialer := &kafka.Dialer{
Timeout: time.Duration(w.GetConfig().Loggers.KafkaProducer.ConnectTimeout) * time.Second,
Expand Down Expand Up @@ -133,16 +143,39 @@ func (w *KafkaProducer) ConnectToKafka(ctx context.Context, readyTimer *time.Tim

}

// connect
conn, err := dialer.DialLeader(ctx, "tcp", address, topic, partition)
if err != nil {
w.LogError("%s", err)
w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval)
time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second)
continue
}
var conn *kafka.Conn
var err error

w.kafkaConn = conn
if partition == nil {
// Lookup partitions and create connections for each
partitions, err := dialer.LookupPartitions(ctx, "tcp", address, topic)
if err != nil {
w.LogError("failed to lookup partitions:", err)
w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval)
time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second)
continue
}
for _, p := range partitions {
conn, err = dialer.DialLeader(ctx, "tcp", address, p.Topic, p.ID)
if err != nil {
w.LogError("failed to dial leader for partition %d: %s", p.ID, err)
w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval)
time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second)
continue
}
w.kafkaConns[p.ID] = conn
}
} else {
// DialLeader directly for a specific partition
conn, err = dialer.DialLeader(ctx, "tcp", address, topic, *partition)
if err != nil {
w.LogError("failed to dial leader for partition %d and topic %s: %s", *partition, topic, err)
w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval)
time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second)
continue
}
w.kafkaConns[*partition] = conn
}

// block until is ready
w.kafkaReady <- true
Expand All @@ -154,6 +187,7 @@ func (w *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) {
msgs := []kafka.Message{}
buffer := new(bytes.Buffer)
strDm := ""
partition := w.GetConfig().Loggers.KafkaProducer.Partition

for _, dm := range *buf {
switch w.GetConfig().Loggers.KafkaProducer.Mode {
Expand Down Expand Up @@ -181,18 +215,40 @@ func (w *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) {

}

// add support for msg compression
// add support for msg compression and round robin
var err error
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
_, err = w.kafkaConn.WriteMessages(msgs...)
} else {
_, err = w.kafkaConn.WriteCompressedMessages(w.compressCodec, msgs...)
}
if partition == nil {
index := 0
numPartitions := len(w.kafkaConns)
for _, msg := range msgs {
conn := w.kafkaConns[index]
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
_, err = conn.WriteMessages(msg)
} else {
_, err = conn.WriteCompressedMessages(w.compressCodec, msg)
}
if err != nil {
w.LogError("unable to write message", err.Error())
w.kafkaConnected = false
<-w.kafkaReconnect
break
}

if err != nil {
w.LogError("unable to write message", err.Error())
w.kafkaConnected = false
<-w.kafkaReconnect
// Move to the next partition in round-robin fashion
index = (index + 1) % numPartitions
}
} else {
conn := w.kafkaConns[*partition]
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
_, err = conn.WriteMessages(msgs...)
} else {
_, err = conn.WriteCompressedMessages(w.compressCodec, msgs...)
}
if err != nil {
w.LogError("unable to write message", err.Error())
w.kafkaConnected = false
<-w.kafkaReconnect
}
}

// reset buffer
Expand Down

0 comments on commit 131a4e9

Please sign in to comment.