-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry sending messages only for retriable exceptions #29
Retry sending messages only for retriable exceptions #29
Conversation
failures << record | ||
nil | ||
rescue org.apache.kafka.common.errors.InterruptException => e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 change in behavior we need to understand: InterruptException
were previously retried now they aren't.
we should understand why the special exception handling was introduced to be retriable and keep if it's still valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! InterruptException
gets thrown "if the thread is interrupted while blocked" according to the KafkaProducer#send() Javadoc. I don't think this should happen unless the producer is stopped or otherwise interrupted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current behaviour was introduced in logstash-plugins/logstash-output-kafka#151, which doesn't motivate why InterruptException
is in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Mark, from that conversation it seemed like just following producer's send
throws
... now Kafka's InterruptException
doesn't inherit from RetriableException
but I think we should keep the backwards compatible behavior esp. unless we clearly understand most cases for potential interrupts are not recoverable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've explicitly added InterruptException
back in the retry logic. I'm still not sure this is the best way to go (maybe rethrowing it as an InterruptedException
would make more sense), but at least the behaviour is backwards compatible now.
9d581b5
to
78ab75c
Compare
@kares Do you see any other blockers to getting this merged? Thanks! |
@kares any updates on this? |
This is a big problem for us, generating a ton of noise in our logs. Any ETA? |
LGTM in general, p.s. we are going to need a version bump, probably at least minor with a changelog entry. |
@praseodym Firstly, sincere apologies for the amount of time this and the the previous PR in the old repo have been hanging around for. I'm ok with including this - it seems futile to keep on trying to send messages that will never be accepted by Kafka - but I think it makes sense to document the change in behavior, in a similar way to how we do in the Elasticsearch output, particularly with regard to messages that would generate cc @jsvd, doc(@karenzone) |
Thanks for the ping, @robbavey. @praseodym, I'm happy to review the docs when you're ready. Please let me know if you have any questions, or I can be of assistance. Thanks! |
78ab75c
to
5c0be42
Compare
@kares @robbavey @karenzone Thanks! I've rebased this PR and updated the docs + changelog in the most recent commit. Please take a look! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking time to update the documentation with this improvement.
Docs build cleanly and LGTM!
docs/output-kafka.asciidoc
Outdated
@@ -323,6 +323,13 @@ Kafka down, etc). | |||
|
|||
A value less than zero is a configuration error. | |||
|
|||
This plugin will only retry exceptions that are a subclass of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a good start, would also mention that previous plugin versions (<= 10.4.0) kept retrying all errors forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren’t the docs versioned, i.e. previous versions of Logstash docs will not include this new paragraph? Or do you think it’s better to be explicit and add the version number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes they are but it's quite uncommon for someone comparing docs side-by-side.
As noted before, the ES plugin still mentions a considerable change in behaviour on handling exceptions:
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#_retry_policy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the docs to also mention the old behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will also need a version bump at:
s.version = '10.3.0' |
Have been running this for about a week for so via my own fork, thank you! Works well and extremely needed. |
Nil values were removed from the futures array before looping, causing wrong indexes relative to the batch array.
To preserve existing behaviour.
983da6a
to
04cba7a
Compare
04cba7a
to
f18bb0c
Compare
Done! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm good with the updated changes. Thank you again @praseodym for your contribution
@karenzone - would you mind giving the updated docs another check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs build cleanly and look great! Thanks for this contribution, @praseodym.
Partially solve infinite loops that Filebeat goes into by explicitly listing what Apache Kafka calls "retriable errors" and checking for them when an error occurs. Then, as time goes on given that no new events come in, the batch will be completely dropped if `flush.timeout` is set to at least `10s`. This is needed because otherwise the breaker will be constantly open. Ideally, the breaker wouldn't open when such errors happens but at the very least we can be smarter from Beats's side by handling errors like this. Partially inspired by logstash-plugins/logstash-integration-kafka#29. Signed-off-by: Giedrius Statkevičius <[email protected]>
Partially solve infinite loops that Filebeat goes into by explicitly listing what Apache Kafka calls "retriable errors" and checking for them when an error occurs. Then, as time goes on given that no new events come in, the batch will be completely dropped if `flush.timeout` is set to at least `10s`. This is needed because otherwise the breaker will be constantly open. Ideally, the breaker wouldn't open when such errors happens but at the very least we can be smarter from Beats's side by handling errors like this. Partially inspired by logstash-plugins/logstash-integration-kafka#29. Signed-off-by: Giedrius Statkevičius <[email protected]>
Fixes #27