Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka output panic at startup #41824

Merged
merged 8 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
return c, nil
}

func (c *client) Connect() error {
func (c *client) Connect(_ context.Context) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you revert this change and run the integration test, it will fail with the panic in the original issue.

c.mux.Lock()
defer c.mux.Unlock()

Expand Down Expand Up @@ -257,7 +257,7 @@
defer c.log.Debug("Stop kafka ack worker")

for libMsg := range ch {
msg := libMsg.Metadata.(*message)

Check failure on line 260 in libbeat/outputs/kafka/client.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
msg.ref.done()
}
}
Expand All @@ -268,7 +268,7 @@
defer c.log.Debug("Stop kafka error handler")

for errMsg := range ch {
msg := errMsg.Msg.Metadata.(*message)

Check failure on line 271 in libbeat/outputs/kafka/client.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
msg.ref.fail(msg, errMsg.Err)

if errors.Is(errMsg.Err, breaker.ErrBreakerOpen) {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestKafkaPublish(t *testing.T) {

output, ok := grp.Clients[0].(*client)
assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client")
if err := output.Connect(); err != nil {
if err := output.Connect(context.Background()); err != nil {
t.Fatal(err)
}
assert.Equal(t, output.index, "testbeat")
Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/redis/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package redis

import (
"context"
"errors"
"time"

"github.com/gomodule/redigo/redis"
Expand Down Expand Up @@ -61,7 +62,7 @@ func newBackoffClient(client *client, init, max time.Duration) *backoffClient {
}

func (b *backoffClient) Connect(ctx context.Context) error {
err := b.client.Connect()
err := b.client.Connect(ctx)
if err != nil {
// give the client a chance to promote an internal error to a network error.
b.updateFailReason(err)
Expand Down Expand Up @@ -102,7 +103,8 @@ func (b *backoffClient) updateFailReason(err error) {
return
}

if _, ok := err.(redis.Error); ok {
var redisErr *redis.Error
if errors.As(err, &redisErr) {
b.reason = failRedis
} else {
b.reason = failOther
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newClient(
}
}

func (c *client) Connect() error {
func (c *client) Connect(_ context.Context) error {
c.log.Debug("connect")
err := c.Client.Connect()
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions libbeat/tests/integration/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build integration

package integration

import (
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
)

var (
// https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/config_test.go#L14-L17
// The version of MockBroker used when this test was written only supports the lowest protocol version by default.
// Version incompatibilities will result in message decoding errors between the mock and the beat.
kafkaVersion = sarama.MinVersion
Comment on lines +31 to +34
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovering the need for this cost me 3 hours 🙃

kafkaTopic = "test_topic"
kafkaCfg = `
mockbeat:
logging:
level: debug
selectors:
- publisher_pipeline_output
- kafka
queue.mem:
events: 4096
flush.timeout: 0s
output.kafka:
topic: %s
version: %s
hosts:
- %s
backoff:
init: 0.1s
max: 0.2s
`
)

// TestKafkaOutputCanConnectAndPublish ensures the beat Kafka output can successfully produce messages to Kafka.
// Regression test for https://github.com/elastic/beats/issues/41823 where the Kafka output would
// panic on the first Publish because it's Connect method was no longer called.
func TestKafkaOutputCanConnectAndPublish(t *testing.T) {
// Create a Mock Kafka broker that will listen on localhost on a random unallocated port.
// The reference configuration was taken from https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/async_producer_test.go#L141.
leader := sarama.NewMockBroker(t, 1)
defer leader.Close()

// The mock broker must respond to a single metadata request.
metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)

// The mock broker must return a single produce response. If no produce request is received, the test will fail.
// This guarantees that mockbeat successfully produced a message to Kafka and connectivity is established.
prodSuccess := new(sarama.ProduceResponse)
prodSuccess.AddTopicPartition(kafkaTopic, 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

// Start mockbeat with the appropriate configuration.
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr()))
mockbeat.Start()

// Wait for mockbeat to log that it successfully published a batch to Kafka.
// This ensures that mockbeat received the expected produce response configured above.
mockbeat.WaitForLogs(
`finished kafka batch`,
10*time.Second,
"did not find finished batch log")
}
Loading