-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Output support manipulating kafka message headers #162
Conversation
Due to previous failure in the Travis CI build, I modified the commit and uploaded a fix for some flakiness in the "when KafkaProducer#send() raises a non-retriable exception" test case |
@roaksoax Hi, would it be possible to review this pull request? What would be needed to include this feature? |
@mashhurs Sorry for the possibly unnecessary involvement, I saw you had reviewed the last pull request. What is the way of working to get a code review for the pull request? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @LaithSiriani!
I have placed some comments and I do believe they are relatively easy to make changes. At the same time, I am thinking (not sure so far) if we can abstract this in a decorative ways like we have in kafka-input
Can you also please add your use case in the PR description? This would also help to define the baseline.
spec/unit/outputs/kafka_spec.rb
Outdated
@@ -103,7 +113,7 @@ | |||
end | |||
|
|||
context "when KafkaProducer#send() raises a non-retriable exception" do | |||
let(:failcount) { (rand * 10).to_i } | |||
let(:failcount) { (rand * 10 + 1).to_i } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is a relative change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not related to the goal of the changeset but I got annoyed that the first patchset uploaded failed due to the instability of this test case, I can revert the change if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test case started failing again after I reverted the change, should I re-introduce it?
lib/logstash/outputs/kafka.rb
Outdated
@@ -106,6 +106,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base | |||
config :max_request_size, :validate => :number, :default => 1_048_576 # (1MB) Kafka default | |||
# The key for the message | |||
config :message_key, :validate => :string | |||
# Headers added to kafka message in the form of key-value pairs | |||
config :message_headers, :validate => :hash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a default value would be beneficial to avoid always nil?
checks.
This also requires doc changes:
- add to the params table with alphabetical order
- explain it in the main doc content (obeys the order of the param options table)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
220db5b
to
dabe744
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I am +1 to the feature.
I've added docs-related suggestions, and believe that we would benefit from calling this new option message_headers
(plural) because we are providing a mapping that can be plural, which aligns with things like the http output.
We may also benefit from checking the value during Kafka#register
to ensure that both keys and values are strings in order to avoid runtime surprises (I wish our built-in validators handled this, but they do not).
If we don't have an explicit use-case for making the keys variable, then I would suggest avoiding the sprintf on the keys (and changing the docs to reflect that only the values are interpolated).
20b858a
to
c42eee6
Compare
As a user of the kafka output plugin, I would like to be able to produce kafka messages with kafka headers that are either added statically or derived from the logstash event data itself, so I can filter messages in the kafka consumers. - Add the ability to set headers in the output kafka message - Add unittest and integration test case to verify
Thank you so much @LaithSiriani for addressing all comments and moving forward. If we don't have an explicit use-case for making the keys variable, then I would suggest avoiding the sprintf on the keys (and changing the docs to reflect that only the values are interpolated). We would opt out |
Hi @mashhurs , the comment was explicitly about the key, which has been addressed, the value does have a use case to be interpolated by the sprintf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
New v11.4.0 version released with this change. |
Thanks for contributing to Logstash! If you haven't already signed our CLA, here's a handy link: https://www.elastic.co/contributor-agreement/