diff --git a/docs/loggers/logger_kafka.md b/docs/loggers/logger_kafka.md index d35d9981..c5b7e6c6 100644 --- a/docs/loggers/logger_kafka.md +++ b/docs/loggers/logger_kafka.md @@ -56,6 +56,9 @@ Options: * `mode` (string) > Specifies the output format for Kafka messages. Output format: `text`, `json`, or `flat-json`. +* `text-format` (string) + > output text format, please refer to the default text format to see all available [directives](../configuration.md#custom-text-format), use this parameter if you want a specific format + * `buffer-size` (integer) > Specifies the size of the buffer for DNS messages before they are sent to Kafka. @@ -88,6 +91,7 @@ kafkaproducer: sasl-username: false sasl-password: false mode: flat-json + text-format: "" buffer-size: 100 topic: "dnscollector" partition: 0 diff --git a/loggers/kafkaproducer.go b/loggers/kafkaproducer.go index 1468d23c..d00ffb64 100644 --- a/loggers/kafkaproducer.go +++ b/loggers/kafkaproducer.go @@ -71,8 +71,8 @@ func (k *KafkaProducer) AddDefaultRoute(wrk pkgutils.Worker) { func (k *KafkaProducer) SetLoggers(loggers []pkgutils.Worker) {} func (k *KafkaProducer) ReadConfig() { - if len(k.config.Loggers.RedisPub.TextFormat) > 0 { - k.textFormat = strings.Fields(k.config.Loggers.RedisPub.TextFormat) + if len(k.config.Loggers.KafkaProducer.TextFormat) > 0 { + k.textFormat = strings.Fields(k.config.Loggers.KafkaProducer.TextFormat) } else { k.textFormat = strings.Fields(k.config.Global.TextFormat) } diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index 3da70959..c3f775b5 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -288,6 +288,7 @@ type ConfigLoggers struct { SaslPassword string `yaml:"sasl-password"` SaslMechanism string `yaml:"sasl-mechanism"` Mode string `yaml:"mode"` + TextFormat string `yaml:"text-format"` BufferSize int `yaml:"buffer-size"` FlushInterval int `yaml:"flush-interval"` ConnectTimeout int `yaml:"connect-timeout"` @@ -570,6 +571,7 @@ func (c *ConfigLoggers) SetDefault() { c.KafkaProducer.SaslPassword = "" c.KafkaProducer.SaslMechanism = SASLMechanismPlain c.KafkaProducer.Mode = ModeFlatJSON + c.KafkaProducer.TextFormat = "" c.KafkaProducer.BufferSize = 100 c.KafkaProducer.ConnectTimeout = 5 c.KafkaProducer.FlushInterval = 10