Skip to content

Commit

Permalink
Add support for setting kafka client id (influxdata#4418)
Browse files Browse the repository at this point in the history
  • Loading branch information
PhoenixRion authored and otherpirate committed Mar 15, 2019
1 parent c809639 commit c0ac995
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 0 deletions.
3 changes: 3 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ and use the old zookeeper connection method.
## Offset (must be either "oldest" or "newest")
offset = "oldest"

## Optional client id
# client_id = "my_client"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
Expand Down
10 changes: 10 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

type Kafka struct {
ConsumerGroup string
ClientID string `toml:"client_id"`
Topics []string
Brokers []string
MaxMessageLen int
Expand Down Expand Up @@ -59,6 +60,9 @@ var sampleConfig = `
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## Optional Client id
# client_id = "Telegraf"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down Expand Up @@ -114,6 +118,12 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
return err
}

if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}

if tlsConfig != nil {
log.Printf("D! TLS Enabled")
config.Net.TLS.Config = tlsConfig
Expand Down
3 changes: 3 additions & 0 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## Kafka topic for producer messages
topic = "telegraf"

## Optional client id
# client_id = "my_client"

## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
Expand Down
11 changes: 11 additions & 0 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type (
Brokers []string
// Kafka topic
Topic string
// Kafka client id
ClientID string `toml:"client_id"`
// Kafka topic suffix option
TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Routing Key Tag
Expand Down Expand Up @@ -68,6 +70,9 @@ var sampleConfig = `
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"
## Optional Client id
# client_id = "Telegraf"
## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
Expand Down Expand Up @@ -186,6 +191,12 @@ func (k *Kafka) Connect() error {
}
config := sarama.NewConfig()

if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}

config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
config.Producer.Retry.Max = k.MaxRetry
Expand Down

0 comments on commit c0ac995

Please sign in to comment.