From 131a4e90601c5cf0d07d51430cf05fa164c83d20 Mon Sep 17 00:00:00 2001 From: Denis Machard <5562930+dmachard@users.noreply.github.com> Date: Mon, 1 Jul 2024 08:14:03 +0200 Subject: [PATCH] support round robin partitionner for kafka (#759) * support round robin partitionner for kafka * some fixes --- .../elasticsearch/config.deprecated.yml | 25 ---- .../fluentd/config.deprecated.yml | 33 ----- docs/_integration/kafka/config.deprecated.yml | 36 ------ docs/_integration/kafka/config.yml | 4 +- docs/examples.md | 2 +- docs/loggers/logger_kafka.md | 5 +- pkgconfig/loggers.go | 2 +- workers/kafkaproducer.go | 116 +++++++++++++----- 8 files changed, 93 insertions(+), 130 deletions(-) delete mode 100644 docs/_integration/elasticsearch/config.deprecated.yml delete mode 100644 docs/_integration/fluentd/config.deprecated.yml delete mode 100644 docs/_integration/kafka/config.deprecated.yml diff --git a/docs/_integration/elasticsearch/config.deprecated.yml b/docs/_integration/elasticsearch/config.deprecated.yml deleted file mode 100644 index 82f58e2e..00000000 --- a/docs/_integration/elasticsearch/config.deprecated.yml +++ /dev/null @@ -1,25 +0,0 @@ - -global: - trace: - verbose: true - -multiplexer: - collectors: - - name: tap - dnstap: - listen-ip: 0.0.0.0 - listen-port: 6000 - chan-buffer-size: 4096 - loggers: - - name: elastic - elasticsearch: - server: "http://192.168.1.220:9200/" - index: "dnscollector" - chan-buffer-size: 4096 - bulk-size: 5242880 - flush-interval: 10 - compression: gzip - bulk-channel-size: 10 - routes: - - from: [ tap ] - to: [ elastic ] \ No newline at end of file diff --git a/docs/_integration/fluentd/config.deprecated.yml b/docs/_integration/fluentd/config.deprecated.yml deleted file mode 100644 index 27c122ba..00000000 --- a/docs/_integration/fluentd/config.deprecated.yml +++ /dev/null @@ -1,33 +0,0 @@ - -global: - trace: - verbose: true - -multiplexer: - collectors: - - name: tap - dnstap: - listen-ip: 0.0.0.0 - listen-port: 6000 - chan-buffer-size: 4096 - loggers: - - name: fluentd - fluentd: - transport: tcp - remote-address: 127.0.0.1 - remote-port: 24224 - connect-timeout: 5 - retry-interval: 10 - flush-interval: 30 - tag: "dns.collector" - tls-insecure: false - tls-min-version: 1.2 - ca-file: "" - cert-file: "" - key-file: "" - buffer-size: 100 - chan-buffer-size: 4096 - - routes: - - from: [ tap ] - to: [ fluentd ] \ No newline at end of file diff --git a/docs/_integration/kafka/config.deprecated.yml b/docs/_integration/kafka/config.deprecated.yml deleted file mode 100644 index 6efa0890..00000000 --- a/docs/_integration/kafka/config.deprecated.yml +++ /dev/null @@ -1,36 +0,0 @@ - -global: - trace: - verbose: true - -multiplexer: - collectors: - - name: tap - dnstap: - listen-ip: 0.0.0.0 - listen-port: 6000 - chan-buffer-size: 4096 - loggers: - - name: kafka - kafkaproducer: - remote-address: 127.0.0.1 - remote-port: 9092 - connect-timeout: 5 - retry-interval: 10 - flush-interval: 30 - tls-support: false - tls-insecure: false - sasl-support: false - sasl-mechanism: PLAIN - sasl-username: false - sasl-password: false - mode: flat-json - buffer-size: 100 - topic: "dnscollector" - partition: 0 - chan-buffer-size: 4096 - compression: none - - routes: - - from: [ tap ] - to: [ kafka ] \ No newline at end of file diff --git a/docs/_integration/kafka/config.yml b/docs/_integration/kafka/config.yml index edeeb59a..2077325f 100644 --- a/docs/_integration/kafka/config.yml +++ b/docs/_integration/kafka/config.yml @@ -15,7 +15,7 @@ pipelines: - name: kafka kafkaproducer: - remote-address: 127.0.0.1 + remote-address: 192.168.1.16 remote-port: 9092 connect-timeout: 5 retry-interval: 10 @@ -29,6 +29,6 @@ pipelines: mode: flat-json buffer-size: 100 topic: "dnscollector" - partition: 0 + partition: null chan-buffer-size: 4096 compression: none \ No newline at end of file diff --git a/docs/examples.md b/docs/examples.md index b4a40bb7..9bbf25fb 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -5,7 +5,7 @@ You will find below some examples of configurations to manage your DNS logs. - Pipelines running mode - [x] [Advanced example with DNSmessage collector](./_examples/use-case-24.yml) - - [x] [How to only log slow responses and errors only?](./_examples/use-case-25.yml) + - [x] [How can I log only slow responses and errors?"](./_examples/use-case-25.yml) - Capture DNS traffic from incoming DNSTap streams - [x] [Read from UNIX DNSTap socket and forward it to TLS stream](./_examples/use-case-5.yml) diff --git a/docs/loggers/logger_kafka.md b/docs/loggers/logger_kafka.md index 9f8a2116..05094b88 100644 --- a/docs/loggers/logger_kafka.md +++ b/docs/loggers/logger_kafka.md @@ -60,13 +60,14 @@ Options: > output text format, please refer to the default text format to see all available [directives](../configuration.md#custom-text-format), use this parameter if you want a specific format * `buffer-size` (integer) - > Specifies the size of the buffer for DNS messages before they are sent to Kafka. + > Specifies the size of the bulk for DNS messages before they are sent to Kafka. * `topic` (integer) > Specifies the Kafka topic to which messages will be forwarded. * `partition` (integer) > Specifies the Kafka partition to which messages will be sent. + > If partition parameter is null, then use `round-robin` partitioner for kafka (default behavior) * `chan-buffer-size` (int) > Specifies the maximum number of packets that can be buffered before discard additional packets. @@ -95,7 +96,7 @@ kafkaproducer: text-format: "" buffer-size: 100 topic: "dnscollector" - partition: 0 + partition: null chan-buffer-size: 0 compression: none ``` diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index 8cf56523..55adc6a6 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -292,7 +292,7 @@ type ConfigLoggers struct { FlushInterval int `yaml:"flush-interval" default:"10"` ConnectTimeout int `yaml:"connect-timeout" default:"5"` Topic string `yaml:"topic" default:"dnscollector"` - Partition int `yaml:"partition" default:"0"` + Partition *int `yaml:"partition" default:"nil"` ChannelBufferSize int `yaml:"chan-buffer-size" default:"0"` Compression string `yaml:"compression" default:"none"` } `yaml:"kafkaproducer"` diff --git a/workers/kafkaproducer.go b/workers/kafkaproducer.go index a149a524..35870cea 100644 --- a/workers/kafkaproducer.go +++ b/workers/kafkaproducer.go @@ -22,10 +22,10 @@ import ( type KafkaProducer struct { *GenericWorker textFormat []string - kafkaConn *kafka.Conn kafkaReady, kafkaReconnect chan bool kafkaConnected bool compressCodec compress.Codec + kafkaConns map[int]*kafka.Conn // Map to store connections by partition } func NewKafkaProducer(config *pkgconfig.Config, logger *logger.Logger, name string) *KafkaProducer { @@ -33,9 +33,12 @@ func NewKafkaProducer(config *pkgconfig.Config, logger *logger.Logger, name stri if config.Loggers.KafkaProducer.ChannelBufferSize > 0 { bufSize = config.Loggers.KafkaProducer.ChannelBufferSize } - w := &KafkaProducer{GenericWorker: NewGenericWorker(config, logger, name, "kafka", bufSize, pkgconfig.DefaultMonitor)} - w.kafkaReady = make(chan bool) - w.kafkaReconnect = make(chan bool) + w := &KafkaProducer{ + GenericWorker: NewGenericWorker(config, logger, name, "kafka", bufSize, pkgconfig.DefaultMonitor), + kafkaReady: make(chan bool), + kafkaReconnect: make(chan bool), + kafkaConns: make(map[int]*kafka.Conn), + } w.ReadConfig() return w } @@ -66,26 +69,33 @@ func (w *KafkaProducer) ReadConfig() { } func (w *KafkaProducer) Disconnect() { - if w.kafkaConn != nil { - w.LogInfo("closing connection") - w.kafkaConn.Close() + // Close all Kafka connections + for _, conn := range w.kafkaConns { + if conn != nil { + w.LogInfo("closing connection per partition") + conn.Close() + } } + w.kafkaConns = make(map[int]*kafka.Conn) // Clear the map } func (w *KafkaProducer) ConnectToKafka(ctx context.Context, readyTimer *time.Timer) { for { readyTimer.Reset(time.Duration(10) * time.Second) - if w.kafkaConn != nil { - w.kafkaConn.Close() - w.kafkaConn = nil + if len(w.kafkaConns) > 0 { + w.Disconnect() } topic := w.GetConfig().Loggers.KafkaProducer.Topic partition := w.GetConfig().Loggers.KafkaProducer.Partition address := w.GetConfig().Loggers.KafkaProducer.RemoteAddress + ":" + strconv.Itoa(w.GetConfig().Loggers.KafkaProducer.RemotePort) - w.LogInfo("connecting to kafka=%s partition=%d topic=%s", address, partition, topic) + if partition == nil { + w.LogInfo("connecting to kafka=%s partition=all topic=%s", address, topic) + } else { + w.LogInfo("connecting to kafka=%s partition=%d topic=%s", address, *partition, topic) + } dialer := &kafka.Dialer{ Timeout: time.Duration(w.GetConfig().Loggers.KafkaProducer.ConnectTimeout) * time.Second, @@ -133,16 +143,39 @@ func (w *KafkaProducer) ConnectToKafka(ctx context.Context, readyTimer *time.Tim } - // connect - conn, err := dialer.DialLeader(ctx, "tcp", address, topic, partition) - if err != nil { - w.LogError("%s", err) - w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval) - time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second) - continue - } + var conn *kafka.Conn + var err error - w.kafkaConn = conn + if partition == nil { + // Lookup partitions and create connections for each + partitions, err := dialer.LookupPartitions(ctx, "tcp", address, topic) + if err != nil { + w.LogError("failed to lookup partitions:", err) + w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval) + time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second) + continue + } + for _, p := range partitions { + conn, err = dialer.DialLeader(ctx, "tcp", address, p.Topic, p.ID) + if err != nil { + w.LogError("failed to dial leader for partition %d: %s", p.ID, err) + w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval) + time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second) + continue + } + w.kafkaConns[p.ID] = conn + } + } else { + // DialLeader directly for a specific partition + conn, err = dialer.DialLeader(ctx, "tcp", address, topic, *partition) + if err != nil { + w.LogError("failed to dial leader for partition %d and topic %s: %s", *partition, topic, err) + w.LogInfo("retry to connect in %d seconds", w.GetConfig().Loggers.KafkaProducer.RetryInterval) + time.Sleep(time.Duration(w.GetConfig().Loggers.KafkaProducer.RetryInterval) * time.Second) + continue + } + w.kafkaConns[*partition] = conn + } // block until is ready w.kafkaReady <- true @@ -154,6 +187,7 @@ func (w *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) { msgs := []kafka.Message{} buffer := new(bytes.Buffer) strDm := "" + partition := w.GetConfig().Loggers.KafkaProducer.Partition for _, dm := range *buf { switch w.GetConfig().Loggers.KafkaProducer.Mode { @@ -181,18 +215,40 @@ func (w *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) { } - // add support for msg compression + // add support for msg compression and round robin var err error - if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone { - _, err = w.kafkaConn.WriteMessages(msgs...) - } else { - _, err = w.kafkaConn.WriteCompressedMessages(w.compressCodec, msgs...) - } + if partition == nil { + index := 0 + numPartitions := len(w.kafkaConns) + for _, msg := range msgs { + conn := w.kafkaConns[index] + if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone { + _, err = conn.WriteMessages(msg) + } else { + _, err = conn.WriteCompressedMessages(w.compressCodec, msg) + } + if err != nil { + w.LogError("unable to write message", err.Error()) + w.kafkaConnected = false + <-w.kafkaReconnect + break + } - if err != nil { - w.LogError("unable to write message", err.Error()) - w.kafkaConnected = false - <-w.kafkaReconnect + // Move to the next partition in round-robin fashion + index = (index + 1) % numPartitions + } + } else { + conn := w.kafkaConns[*partition] + if w.GetConfig().Loggers.KafkaProducer.Compression == pkgconfig.CompressNone { + _, err = conn.WriteMessages(msgs...) + } else { + _, err = conn.WriteCompressedMessages(w.compressCodec, msgs...) + } + if err != nil { + w.LogError("unable to write message", err.Error()) + w.kafkaConnected = false + <-w.kafkaReconnect + } } // reset buffer