Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support round robin partitionner for kafka #759

Merged
merged 2 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions docs/_integration/elasticsearch/config.deprecated.yml

This file was deleted.

33 changes: 0 additions & 33 deletions docs/_integration/fluentd/config.deprecated.yml

This file was deleted.

36 changes: 0 additions & 36 deletions docs/_integration/kafka/config.deprecated.yml

This file was deleted.

4 changes: 2 additions & 2 deletions docs/_integration/kafka/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pipelines:

- name: kafka
kafkaproducer:
remote-address: 127.0.0.1
remote-address: 192.168.1.16
remote-port: 9092
connect-timeout: 5
retry-interval: 10
Expand All @@ -29,6 +29,6 @@ pipelines:
mode: flat-json
buffer-size: 100
topic: "dnscollector"
partition: 0
partition: null
chan-buffer-size: 4096
compression: none
2 changes: 1 addition & 1 deletion docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ You will find below some examples of configurations to manage your DNS logs.

- Pipelines running mode
- [x] [Advanced example with DNSmessage collector](./_examples/use-case-24.yml)
- [x] [How to only log slow responses and errors only?](./_examples/use-case-25.yml)
- [x] [How can I log only slow responses and errors?"](./_examples/use-case-25.yml)

- Capture DNS traffic from incoming DNSTap streams
- [x] [Read from UNIX DNSTap socket and forward it to TLS stream](./_examples/use-case-5.yml)
Expand Down
5 changes: 3 additions & 2 deletions docs/loggers/logger_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ Options:
> output text format, please refer to the default text format to see all available [directives](../configuration.md#custom-text-format), use this parameter if you want a specific format

* `buffer-size` (integer)
> Specifies the size of the buffer for DNS messages before they are sent to Kafka.
> Specifies the size of the bulk for DNS messages before they are sent to Kafka.

* `topic` (integer)
> Specifies the Kafka topic to which messages will be forwarded.

* `partition` (integer)
> Specifies the Kafka partition to which messages will be sent.
> If partition parameter is null, then use `round-robin` partitioner for kafka (default behavior)

* `chan-buffer-size` (int)
> Specifies the maximum number of packets that can be buffered before discard additional packets.
Expand Down Expand Up @@ -95,7 +96,7 @@ kafkaproducer:
text-format: ""
buffer-size: 100
topic: "dnscollector"
partition: 0
partition: null
chan-buffer-size: 0
compression: none
```
2 changes: 1 addition & 1 deletion pkgconfig/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ type ConfigLoggers struct {
FlushInterval int `yaml:"flush-interval" default:"10"`
ConnectTimeout int `yaml:"connect-timeout" default:"5"`
Topic string `yaml:"topic" default:"dnscollector"`
Partition int `yaml:"partition" default:"0"`
Partition *int `yaml:"partition" default:"nil"`
ChannelBufferSize int `yaml:"chan-buffer-size" default:"0"`
Compression string `yaml:"compression" default:"none"`
} `yaml:"kafkaproducer"`
Expand Down
116 changes: 86 additions & 30 deletions workers/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ import (
type KafkaProducer struct {
*GenericWorker
textFormat []string
kafkaConn *kafka.Conn
kafkaReady, kafkaReconnect chan bool
kafkaConnected bool
compressCodec compress.Codec
kafkaConns map[int]*kafka.Conn // Map to store connections by partition
}

func NewKafkaProducer(config *pkgconfig.Config, logger *logger.Logger, name string) *KafkaProducer {
bufSize := config.Global.Worker.ChannelBufferSize
if config.Loggers.KafkaProducer.ChannelBufferSize > 0 {
bufSize = config.Loggers.KafkaProducer.ChannelBufferSize
}
w := &KafkaProducer{GenericWorker: NewGenericWorker(config, logger, name, "kafka", bufSize, pkgconfig.DefaultMonitor)}
w.kafkaReady = make(chan bool)
w.kafkaReconnect = make(chan bool)
w := &KafkaProducer{
GenericWorker: NewGenericWorker(config, logger, name, "kafka", bufSize, pkgconfig.DefaultMonitor),
kafkaReady: make(chan bool),
kafkaReconnect: make(chan bool),
kafkaConns: make(map[int]*kafka.Conn),
}
w.ReadConfig()
return w
}
Expand Down Expand Up @@ -66,26 +69,33 @@ func (w *KafkaProducer) ReadConfig() {
}

func (w *KafkaProducer) Disconnect() {
if w.kafkaConn != nil {
w.LogInfo("closing connection")
w.kafkaConn.Close()
// Close all Kafka connections
for _, conn := range w.kafkaConns {
if conn != nil {
w.LogInfo("closing connection per partition")
conn.Close()
}
}
w.kafkaConns = make(map[int]*kafka.Conn) // Clear the map
}

func (w *KafkaProducer) ConnectToKafka(ctx context.Context, readyTimer *time.Timer) {
for {
readyTimer.Reset(time.Duration(10) * time.Second)

if w.kafkaConn != nil {
w.kafkaConn.Close()
w.kafkaConn = nil
if len(w.kafkaConns) > 0 {
w.Disconnect()
}

topic := w.GetConfig().Loggers.KafkaProducer.Topic
partition := w.GetConfig().Loggers.KafkaProducer.Partition
address := w.GetConfig().Loggers.KafkaProducer.RemoteAddress + ":" + strconv.Itoa(w.GetConfig().Loggers.KafkaProducer.RemotePort)

w.LogInfo("connecting to kafka=%s partition=%d topic=%s", address, partition, topic)
if partition == nil {
w.LogInfo("connecting to kafka=%s partition=all topic=%s", address, topic)
} else {
w.LogInfo("connecting to kafka=%s partition=%d topic=%s", address, *partition, topic)
}

dialer := &kafka.Dialer{
Timeout: time.Duration(w.GetConfig().Loggers.KafkaProducer.ConnectTimeout) * time.Second,
Expand Down Expand Up @@ -133,16 +143,39 @@ func (w *KafkaProducer) ConnectToKafka(ctx context.Context, readyTimer *time.Tim

}

// connect
conn, err := dialer.DialLeader(ctx, "tcp", address, topic, partition)
if err != nil {
w.LogError("%s", err)
w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval)
time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second)
continue
}
var conn *kafka.Conn
var err error

w.kafkaConn = conn
if partition == nil {
// Lookup partitions and create connections for each
partitions, err := dialer.LookupPartitions(ctx, "tcp", address, topic)
if err != nil {
w.LogError("failed to lookup partitions:", err)
w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval)
time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second)
continue
}
for _, p := range partitions {
conn, err = dialer.DialLeader(ctx, "tcp", address, p.Topic, p.ID)
if err != nil {
w.LogError("failed to dial leader for partition %d: %s", p.ID, err)
w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval)
time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second)
continue
}
w.kafkaConns[p.ID] = conn
}
} else {
// DialLeader directly for a specific partition
conn, err = dialer.DialLeader(ctx, "tcp", address, topic, *partition)
if err != nil {
w.LogError("failed to dial leader for partition %d and topic %s: %s", *partition, topic, err)
w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval)
time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second)
continue
}
w.kafkaConns[*partition] = conn
}

// block until is ready
w.kafkaReady <- true
Expand All @@ -154,6 +187,7 @@ func (w *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) {
msgs := []kafka.Message{}
buffer := new(bytes.Buffer)
strDm := ""
partition := w.GetConfig().Loggers.KafkaProducer.Partition

for _, dm := range *buf {
switch w.GetConfig().Loggers.KafkaProducer.Mode {
Expand Down Expand Up @@ -181,18 +215,40 @@ func (w *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) {

}

// add support for msg compression
// add support for msg compression and round robin
var err error
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
_, err = w.kafkaConn.WriteMessages(msgs...)
} else {
_, err = w.kafkaConn.WriteCompressedMessages(w.compressCodec, msgs...)
}
if partition == nil {
index := 0
numPartitions := len(w.kafkaConns)
for _, msg := range msgs {
conn := w.kafkaConns[index]
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
_, err = conn.WriteMessages(msg)
} else {
_, err = conn.WriteCompressedMessages(w.compressCodec, msg)
}
if err != nil {
w.LogError("unable to write message", err.Error())
w.kafkaConnected = false
<-w.kafkaReconnect
break
}

if err != nil {
w.LogError("unable to write message", err.Error())
w.kafkaConnected = false
<-w.kafkaReconnect
// Move to the next partition in round-robin fashion
index = (index + 1) % numPartitions
}
} else {
conn := w.kafkaConns[*partition]
if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone {
_, err = conn.WriteMessages(msgs...)
} else {
_, err = conn.WriteCompressedMessages(w.compressCodec, msgs...)
}
if err != nil {
w.LogError("unable to write message", err.Error())
w.kafkaConnected = false
<-w.kafkaReconnect
}
}

// reset buffer
Expand Down
Loading