-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add max_message_len in kafka_consumer input #2636
Add max_message_len in kafka_consumer input #2636
Conversation
5c8187d
to
0d70d9d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I just have a few minor things.
@@ -28,6 +28,9 @@ from the same topic in parallel. | |||
## more about them here: | |||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | |||
data_format = "influx" | |||
|
|||
## Maximum length of a message to consume, in bytes (default 0/unlimited); larger messages are dropped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap at 78 chars
@@ -28,6 +28,9 @@ from the same topic in parallel. | |||
## more about them here: | |||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | |||
data_format = "influx" | |||
|
|||
## Maximum length of a message to consume, in bytes (default 0/unlimited); larger messages are dropped | |||
max_message_len = 65536 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max_message_len = 64 * 1024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do math in an TOML file? TIL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh whoops, I'm clearly just not paying much attention.
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
data_format = "influx" | ||
|
||
## Maximum length of a message to consume, in bytes (default 0/unlimited); larger messages are dropped | ||
max_message_len = 65536 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments as above
for _, metric := range metrics { | ||
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) | ||
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen { | ||
k.acc.AddError(fmt.Errorf("D! Kafka message longer than max_message_len (%d)", k.MaxMessageLen)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets make this a W! level. AddError should put the plugin name so possibly the Kafka part is redundant, but make sure to add some information about what we are going to do. Maybe it should be "W! Message longer than max_message_len (%d); dropping message".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to avoid dumping the message to the logs, since it could be very long, and thereby fill/ruin the logs. At least for me, it can make them very difficult to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's just the max length here though, not the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think AddError does not take a level prefix at all, and always logs at E!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, looks like. I'll remove all level tags then
@@ -62,6 +63,23 @@ func TestRunParserInvalidMsg(t *testing.T) { | |||
assert.Equal(t, acc.NFields(), 0) | |||
} | |||
|
|||
// Test that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove comment
@@ -133,14 +137,17 @@ func (k *Kafka) receiver() { | |||
k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was thinking of adding a tag here, probably 'E!'. Make sense? And should it end in a newline?
Kind of scope creep, but thought I'd do it while I was in the area
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it shouldn't have a newline, but AddError logs only at E! level.
} else { | ||
metrics, err := k.parser.Parse(msg.Value) | ||
if err != nil { | ||
k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logging of message parsing failures in the default logging level is IMO a bad idea and I think (for reasons I listed elsewhere should be a D!)
Idea being that you would notice metrics are being dropped from the gather_metrics
internal stat, and restart in debug mode to learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we should remove the logging of the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out AddError forces E!-level, which makes this a much bigger change. I'm willing to just do E!
|
||
## Maximum length of a message to consume, in bytes (default 0/unlimited); | ||
## larger messages are dropped | ||
max_message_len = 64 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented back a bit on this, but in case it is lost in the "outdated comments". You can't do this, I just wasn't paying attention to this being TOML in my original comment.
} | ||
for _, metric := range metrics { | ||
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow
You can avoid having to increase the indentation on all this code by doing
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d)", k.MaxMessageLen))
continue
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it still needs to call the bit below to ack the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that was my concern.
for _, metric := range metrics { | ||
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) | ||
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen { | ||
k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d)", k.MaxMessageLen)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error should probably contain how long the message is, not what the limit is. The user should know the limit as they configured it. But if this error is popping up, they might not know the length of the messages that are trying to come through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a line to the CHANGELOG?
} | ||
for _, metric := range metrics { | ||
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it still needs to call the bit below to ack the message.
13ec6f2
to
9207ddf
Compare
Partially fixes #2606