-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Kafka output panic at startup #41824
Conversation
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
No changelog needed because the bug was never released. It is only on main, 8.x, and the 8.17 branch. |
I think the only thing that would catch this is an integration test, I am going to try to write one that looks like
I don't think we need actual Kafka for the regression test, just to observe that we tried to connect to it. I will either bind nothing to 9092 or maybe I'll make or find a small kafka broker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need tests in a follow up. Merging this PR should not close the reported issue.
I have an integration test locally using a Mock Kafka broker that catches the problem, I am just polishing it. Will push soon. |
Updated with an integration test that reproduces the problem using Sarama's MockBroker to ensure we can successfully publish a message from mockbeat using the entire Beat pipeline, without needing to configure a real Kafka instance. |
@@ -113,7 +113,7 @@ func newKafkaClient( | |||
return c, nil | |||
} | |||
|
|||
func (c *client) Connect() error { | |||
func (c *client) Connect(_ context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you revert this change and run the integration test, it will fail with the panic in the original issue.
// https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/config_test.go#L14-L17 | ||
// The version of MockBroker used when this test was written only supports the lowest protocol version by default. | ||
// Version incompatibilities will result in message decoding errors between the mock and the beat. | ||
kafkaVersion = sarama.MinVersion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discovering the need for this cost me 3 hours 🙃
* Make Kafka output satisfy NetworkClient interface. * Make Redis output satisfy network client. * Add initial regression integration test. * Add an integration test to ensure connectivity. * Fix build error in old integration test. * Fix redis lint error. * Fix typo in comment. * Fix another typo. (cherry picked from commit e42589d)
* Make Kafka output satisfy NetworkClient interface. * Make Redis output satisfy network client. * Add initial regression integration test. * Add an integration test to ensure connectivity. * Fix build error in old integration test. * Fix redis lint error. * Fix typo in comment. * Fix another typo. (cherry picked from commit e42589d)
* Make Kafka output satisfy NetworkClient interface. * Make Redis output satisfy network client. * Add initial regression integration test. * Add an integration test to ensure connectivity. * Fix build error in old integration test. * Fix redis lint error. * Fix typo in comment. * Fix another typo. (cherry picked from commit e42589d) Co-authored-by: Craig MacKenzie <[email protected]>
* Make Kafka output satisfy NetworkClient interface. * Make Redis output satisfy network client. * Add initial regression integration test. * Add an integration test to ensure connectivity. * Fix build error in old integration test. * Fix redis lint error. * Fix typo in comment. * Fix another typo. (cherry picked from commit e42589d) Co-authored-by: Craig MacKenzie <[email protected]>
In #40794 the Connectable interface was changed to include a
context.Context
argument, causing all clients with a Connect() that weren't updated to have aContext(context.Context)
method instead to fail the NetworkClient interface check inbeats/libbeat/publisher/pipeline/client_worker.go
Lines 64 to 70 in 367d94e
This made it so that the Kafka and Redis outputs no longer counted as NetworkClients, bypassing the initial call to
Connect()
that was there previously atbeats/libbeat/publisher/pipeline/client_worker.go
Line 143 in 367d94e
In the case of Kafka the Connect call never happening makes the output panic on the first call to
Publish
because there is no producer.Raising without tests to make sure nothing else was missed while I figure out the best test for this that isn't totally contrived, because the problem is in the publisher pipeline and the output specific tests don't hook in at that level. This is how the problem was originally missed.The PR has been updated with an integration test to reproduce the problem: #41824 (comment)