Skip to content

Commit

Permalink
feat: add compression support for kafka logger (#482)
Browse files Browse the repository at this point in the history
* add compression support for kafka logger
* add test unit
  • Loading branch information
dmachard authored Nov 27, 2023
1 parent 3b6a91b commit 00f2de3
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 11 deletions.
2 changes: 2 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ multiplexer:
# partition: 0
# # Channel buffer size for incoming packets, number of packet before to drop it.
# chan-buffer-size: 65535
# # Compression for Kafka messages: none, gzip, lz4, snappy, zstd
# compression: none

# # Send captured traffic to falco (https://falco.org/), for security and advanced inspection
# falco:
Expand Down
2 changes: 2 additions & 0 deletions dnsutils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ type Config struct {
Topic string `yaml:"topic"`
Partition int `yaml:"partition"`
ChannelBufferSize int `yaml:"chan-buffer-size"`
Compression string `yaml:"compression"`
} `yaml:"kafkaproducer"`
FalcoClient struct {
Enable bool `yaml:"enable"`
Expand Down Expand Up @@ -821,6 +822,7 @@ func (c *Config) SetDefault() {
c.Loggers.KafkaProducer.Topic = "dnscollector"
c.Loggers.KafkaProducer.Partition = 0
c.Loggers.KafkaProducer.ChannelBufferSize = 65535
c.Loggers.KafkaProducer.Compression = CompressNone

c.Loggers.FalcoClient.Enable = false
c.Loggers.FalcoClient.URL = "http://127.0.0.1:9200"
Expand Down
10 changes: 9 additions & 1 deletion dnsutils/constants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package dnsutils

import "crypto/tls"
import (
"crypto/tls"
)

const (
StrUnknown = "UNKNOWN"
Expand Down Expand Up @@ -61,6 +63,12 @@ const (
TLSV11 = "1.1"
TLSV12 = "1.2"
TLSV13 = "1.3"

CompressGzip = "gzip"
CompressSnappy = "snappy"
CompressLz4 = "lz4"
CompressZstd = "ztd"
CompressNone = "none"
)

var (
Expand Down
2 changes: 2 additions & 0 deletions docs/loggers/logger_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Options:
- `topic`: (integer) kafka topic to forward messages to
- `partition`: (integer) kafka partition
- `chan-buffer-size`: (integer) channel buffer size used on incoming dns message, number of messages before to drop it.
- `compression`: (string) Compression for Kafka messages: none, gzip, lz4, snappy, zstd

Default values:

Expand All @@ -49,4 +50,5 @@ kafkaproducer:
topic: "dnscollector"
partition: 0
chan-buffer-size: 65535
compression: "none"
```
32 changes: 30 additions & 2 deletions loggers/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"log"
"strconv"
"strings"
"time"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/dmachard/go-dnscollector/transformers"
"github.com/dmachard/go-logger"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)
Expand All @@ -32,6 +34,7 @@ type KafkaProducer struct {
kafkaReady chan bool
kafkaReconnect chan bool
kafkaConnected bool
compressCodec compress.Codec
}

func NewKafkaProducer(config *dnsutils.Config, logger *logger.Logger, name string) *KafkaProducer {
Expand Down Expand Up @@ -66,6 +69,23 @@ func (k *KafkaProducer) ReadConfig() {
} else {
k.textFormat = strings.Fields(k.config.Global.TextFormat)
}

if k.config.Loggers.KafkaProducer.Compression != dnsutils.CompressNone {
switch k.config.Loggers.KafkaProducer.Compression {
case dnsutils.CompressGzip:
k.compressCodec = &compress.GzipCodec
case dnsutils.CompressLz4:
k.compressCodec = &compress.Lz4Codec
case dnsutils.CompressSnappy:
k.compressCodec = &compress.SnappyCodec
case dnsutils.CompressZstd:
k.compressCodec = &compress.ZstdCodec
case dnsutils.CompressNone:
k.compressCodec = nil
default:
log.Fatal("kafka - invalid compress mode: ", k.config.Loggers.KafkaProducer.Compression)
}
}
}

func (k *KafkaProducer) ReloadConfig(config *dnsutils.Config) {
Expand Down Expand Up @@ -163,6 +183,7 @@ func (k *KafkaProducer) ConnectToKafka(ctx context.Context, readyTimer *time.Tim

}

// connect
conn, err := dialer.DialLeader(ctx, "tcp", address, topic, partition)
if err != nil {
k.LogError("%s", err)
Expand Down Expand Up @@ -210,9 +231,16 @@ func (k *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) {

}

_, err := k.kafkaConn.WriteMessages(msgs...)
// add support for msg compression
var err error
if k.config.Loggers.KafkaProducer.Compression == dnsutils.CompressNone {
_, err = k.kafkaConn.WriteMessages(msgs...)
} else {
_, err = k.kafkaConn.WriteCompressedMessages(k.compressCodec, msgs...)
}

if err != nil {
k.LogError("failed to write message", err.Error())
k.LogError("unable to write message", err.Error())
k.kafkaConnected = false
<-k.kafkaReconnect
}
Expand Down
24 changes: 16 additions & 8 deletions loggers/kafkaproducer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,37 @@ func Test_KafkaProducer(t *testing.T) {
transport string
address string
topic string
compress string
}{
{
transport: "tcp",
transport: "compress_none",
address: ":9092",
topic: "dnscollector",
compress: "none",
},
{
transport: "compress_gzip",
address: ":9092",
topic: "dnscollector",
compress: "gzip",
},
}

// Create a new mock broker
// Création d'un listener factice
mockListener, err := net.Listen("tcp", "127.0.0.1:9092")
if err != nil {
log.Fatal(err)
}

for _, tc := range testcases {
t.Run(tc.transport, func(t *testing.T) {
// Create a new mock broker
mockListener, err := net.Listen("tcp", "127.0.0.1:9092")
if err != nil {
log.Fatal(err)
}
defer mockListener.Close()

// init logger
cfg := dnsutils.GetFakeConfig()
cfg.Loggers.KafkaProducer.BufferSize = 0
cfg.Loggers.KafkaProducer.RemotePort = 9092
cfg.Loggers.KafkaProducer.Topic = tc.topic
cfg.Loggers.KafkaProducer.Compression = tc.compress

mockBroker := sarama.NewMockBrokerListener(t, 1, mockListener)
defer mockBroker.Close()
Expand Down

0 comments on commit 00f2de3

Please sign in to comment.