Skip to content

Commit

Permalink
Added SSL and SASL support for input plugin kafka_consumer
Browse files Browse the repository at this point in the history
Use wurstmeister/kafka docker images for input kafka_consumer tests
  • Loading branch information
THIERRY SALLE committed May 2, 2017
1 parent 5c88965 commit 35d1513
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 52 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987
github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
github.com/bsm/sarama-cluster 5d8c11085c875b3155870da9ba6be706429a95dc
github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096
github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
Expand Down
24 changes: 16 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ prepare-windows:
# Run all docker containers necessary for unit tests
docker-run:
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper
docker run --name kafka \
-e ADVERTISED_HOST=localhost \
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
--link zookeeper:zookeeper \
-e KAFKA_ADVERTISED_HOST_NAME=localhost \
-e KAFKA_ADVERTISED_PORT=9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CREATE_TOPICS="test:1:1" \
-p "9092:9092" \
-d wurstmeister/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql
docker run --name memcached -p "11211:11211" -d memcached
Expand All @@ -65,11 +69,15 @@ docker-run:
# Run docker containers necessary for CircleCI unit tests
docker-run-circle:
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper
docker run --name kafka \
-e ADVERTISED_HOST=localhost \
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
--link zookeeper:zookeeper \
-e KAFKA_ADVERTISED_HOST_NAME=localhost \
-e KAFKA_ADVERTISED_PORT=9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CREATE_TOPICS="test:1:1" \
-p "9092:9092" \
-d wurstmeister/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
Expand Down
12 changes: 5 additions & 7 deletions internal/config/testdata/telegraf-agent.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,13 @@

# read metrics from a Kafka topic
[[inputs.kafka_consumer]]
# topic(s) to consume
## kafka brokers
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
# an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
# the name of the consumer group
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
# Maximum number of points to buffer between collection intervals
point_buffer = 100000
# Offset (must be either "oldest" or "newest")
## Offset (must be either "oldest" or "newest")
offset = "oldest"

# Read metrics from a LeoFS Server via SNMP
Expand Down
13 changes: 12 additions & 1 deletion plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,25 @@ from the same topic in parallel.
## topic(s) to consume
topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
brokers = ["localhost:9092"]
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
## Offset (must be either "oldest" or "newest")
offset = "oldest"

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"

## Data format to consume.

## Each data format has its own unique set of configuration options, read
Expand Down
105 changes: 76 additions & 29 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,35 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
cluster "github.com/bsm/sarama-cluster"
)

type Kafka struct {
ConsumerGroup string
Topics []string
MaxMessageLen int
ZookeeperPeers []string
ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup
ConsumerGroup string
Topics []string
Brokers []string
MaxMessageLen int

Cluster *cluster.Consumer

// Verify Kafka SSL Certificate
InsecureSkipVerify bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

// Legacy metric buffer support
MetricBuffer int
Expand All @@ -47,12 +62,22 @@ type Kafka struct {
}

var sampleConfig = `
## kafka servers
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = ""
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
Expand Down Expand Up @@ -84,45 +109,67 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error
var clusterErr error

k.acc = acc

config := consumergroup.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot
config := cluster.NewConfig()
config.Consumer.Return.Errors = true

tlsConfig, err := internal.GetTLSConfig(
k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify)
if err != nil {
return err
}

if tlsConfig != nil {
log.Printf("D! TLS Enabled")
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
if k.SASLUsername != "" && k.SASLPassword != "" {
log.Printf("D! Using SASL auth with username '%s',",
k.SASLUsername)
config.Net.SASL.User = k.SASLUsername
config.Net.SASL.Password = k.SASLPassword
config.Net.SASL.Enable = true
}

switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

if k.Consumer == nil || k.Consumer.Closed() {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
if k.Cluster == nil {
k.Cluster, clusterErr = cluster.NewConsumer(
k.Brokers,
k.ConsumerGroup,
k.Topics,
k.ZookeeperPeers,
config,
)
if consumerErr != nil {
return consumerErr

if clusterErr != nil {
log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v\n",
k.Brokers, k.Topics)
return clusterErr
}

// Setup message and error channels
k.in = k.Consumer.Messages()
k.errs = k.Consumer.Errors()
k.in = k.Cluster.Messages()
k.errs = k.Cluster.Errors()
}

k.done = make(chan struct{})

// Start the kafka message reader
go k.receiver()
log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n",
k.Brokers, k.Topics)
return nil
}

Expand Down Expand Up @@ -156,7 +203,7 @@ func (k *Kafka) receiver() {
// TODO(cam) this locking can be removed if this PR gets merged:
// https://github.com/wvanbergen/kafka/pull/84
k.Lock()
k.Consumer.CommitUpto(msg)
k.Cluster.MarkOffset(msg, "")
k.Unlock()
}
}
Expand All @@ -167,7 +214,7 @@ func (k *Kafka) Stop() {
k.Lock()
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
if err := k.Cluster.Close(); err != nil {
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
}
}
Expand Down
11 changes: 5 additions & 6 deletions plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func TestReadsMetricsFromKafka(t *testing.T) {
}

brokerPeers := []string{testutil.GetLocalHost() + ":9092"}
zkPeers := []string{testutil.GetLocalHost() + ":2181"}
testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix())

// Send a Kafka message to the kafka host
Expand All @@ -36,11 +35,11 @@ func TestReadsMetricsFromKafka(t *testing.T) {

// Start the Kafka Consumer
k := &Kafka{
ConsumerGroup: "telegraf_test_consumers",
Topics: []string{testTopic},
ZookeeperPeers: zkPeers,
PointBuffer: 100000,
Offset: "oldest",
ConsumerGroup: "telegraf_test_consumers",
Topics: []string{testTopic},
Brokers: brokerPeers,
PointBuffer: 100000,
Offset: "oldest",
}
p, _ := parsers.NewInfluxParser()
k.SetParser(p)
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
k := Kafka{
ConsumerGroup: "test",
Topics: []string{"telegraf"},
ZookeeperPeers: []string{"localhost:2181"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
in: in,
doNotCommitMsgs: true,
Expand Down

0 comments on commit 35d1513

Please sign in to comment.