Skip to content

Commit

Permalink
Add max_message_len in kafka_consumer input (#2636)
Browse files Browse the repository at this point in the history
  • Loading branch information
nfirvine authored and danielnelson committed Apr 11, 2017
1 parent f55af7d commit 0193cbe
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ be deprecated eventually.
- [#2425](https://github.com/influxdata/telegraf/pull/2425): Support to include/exclude docker container labels as tags
- [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin
- [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener
- [#2636](https://github.com/influxdata/telegraf/pull/2636): Add `message_len_max` option to `kafka_consumer` input

### Bugfixes

Expand Down
4 changes: 4 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ from the same topic in parallel.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
```

## Testing
Expand Down
31 changes: 20 additions & 11 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type Kafka struct {
ConsumerGroup string
Topics []string
MaxMessageLen int
ZookeeperPeers []string
ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup
Expand Down Expand Up @@ -58,10 +59,14 @@ var sampleConfig = `
offset = "oldest"
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
`

func (k *Kafka) SampleConfig() string {
Expand Down Expand Up @@ -130,17 +135,21 @@ func (k *Kafka) receiver() {
return
case err := <-k.errs:
if err != nil {
k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err))
k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err))
}
case msg := <-k.in:
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
string(msg.Value), err.Error()))
}

for _, metric := range metrics {
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)",
len(msg.Value), k.MaxMessageLen))
} else {
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s",
string(msg.Value), err.Error()))
}
for _, metric := range metrics {
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}

if !k.doNotCommitMsgs {
Expand All @@ -159,7 +168,7 @@ func (k *Kafka) Stop() {
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
k.acc.AddError(fmt.Errorf("E! Error closing kafka consumer: %s\n", err.Error()))
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
}
}

Expand Down
18 changes: 18 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka_consumer

import (
"strings"
"testing"

"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -62,6 +63,23 @@ func TestRunParserInvalidMsg(t *testing.T) {
assert.Equal(t, acc.NFields(), 0)
}

// Test that overlong messages are dropped
func TestDropOverlongMsg(t *testing.T) {
const maxMessageLen = 64 * 1024
k, in := newTestKafka()
k.MaxMessageLen = maxMessageLen
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
overlongMsg := strings.Repeat("v", maxMessageLen+1)

go k.receiver()
in <- saramaMsg(overlongMsg)
acc.WaitError(1)

assert.Equal(t, acc.NFields(), 0)
}

// Test that the parser parses kafka messages into points
func TestRunParserAndGather(t *testing.T) {
k, in := newTestKafka()
Expand Down

0 comments on commit 0193cbe

Please sign in to comment.