diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 562f3fd5d9b74..bb410a1d5bf66 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -10,9 +10,15 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## Kafka topic for producer messages topic = "telegraf" - ## Optional client id + ## Optional Client id # client_id = "Telegraf" + ## Set the minimal supported Kafka version. Setting this enables the use of new + ## Kafka features and APIs. Of particular interested, lz4 compression + ## requires at least version 0.10.0.0. + ## ex: version = "1.1.0" + # version = "" + ## Optional topic suffix configuration. ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: @@ -20,7 +26,7 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## tags - suffix equals to separator + specified tags' values ## interleaved with separator - ## Suffix equals to "_" + measurement's name + ## Suffix equals to "_" + measurement name # [outputs.kafka.topic_suffix] # method = "measurement" # separator = "_" diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index a45e2a4e9c3cf..bbd6b6ad0417d 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -38,6 +38,8 @@ type ( // MaxRetry Tag MaxRetry int + Version string `toml:"version"` + // Legacy TLS config options // TLS client certificate Certificate string @@ -74,6 +76,12 @@ var sampleConfig = ` ## Optional Client id # client_id = "Telegraf" + ## Set the minimal supported Kafka version. Setting this enables the use of new + ## Kafka features and APIs. Of particular interested, lz4 compression + ## requires at least version 0.10.0.0. + ## ex: version = "1.1.0" + # version = "" + ## Optional topic suffix configuration. ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: @@ -191,6 +199,14 @@ func (k *Kafka) Connect() error { } config := sarama.NewConfig() + if k.Version != "" { + version, err := sarama.ParseKafkaVersion(k.Version) + if err != nil { + return err + } + config.Version = version + } + if k.ClientID != "" { config.ClientID = k.ClientID } else {