Skip to content

Commit

Permalink
Fix Kafka output panic at startup (elastic#41824)
Browse files Browse the repository at this point in the history
* Make Kafka output satisfy NetworkClient interface.

* Make Redis output satisfy network client.

* Add initial regression integration test.

* Add an integration test to ensure connectivity.

* Fix build error in old integration test.

* Fix redis lint error.

* Fix typo in comment.

* Fix another typo.
  • Loading branch information
cmacknz authored Nov 28, 2024
1 parent 47f1502 commit e42589d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 5 deletions.
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newKafkaClient(
return c, nil
}

func (c *client) Connect() error {
func (c *client) Connect(_ context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestKafkaPublish(t *testing.T) {

output, ok := grp.Clients[0].(*client)
assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client")
if err := output.Connect(); err != nil {
if err := output.Connect(context.Background()); err != nil {
t.Fatal(err)
}
assert.Equal(t, output.index, "testbeat")
Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/redis/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package redis

import (
"context"
"errors"
"time"

"github.com/gomodule/redigo/redis"
Expand Down Expand Up @@ -61,7 +62,7 @@ func newBackoffClient(client *client, init, max time.Duration) *backoffClient {
}

func (b *backoffClient) Connect(ctx context.Context) error {
err := b.client.Connect()
err := b.client.Connect(ctx)
if err != nil {
// give the client a chance to promote an internal error to a network error.
b.updateFailReason(err)
Expand Down Expand Up @@ -102,7 +103,8 @@ func (b *backoffClient) updateFailReason(err error) {
return
}

if _, ok := err.(redis.Error); ok {
var redisErr *redis.Error
if errors.As(err, &redisErr) {
b.reason = failRedis
} else {
b.reason = failOther
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newClient(
}
}

func (c *client) Connect() error {
func (c *client) Connect(_ context.Context) error {
c.log.Debug("connect")
err := c.Client.Connect()
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions libbeat/tests/integration/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build integration

package integration

import (
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
)

var (
// https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/config_test.go#L14-L17
// The version of MockBroker used when this test was written only supports the lowest protocol version by default.
// Version incompatibilities will result in message decoding errors between the mock and the beat.
kafkaVersion = sarama.MinVersion
kafkaTopic = "test_topic"
kafkaCfg = `
mockbeat:
logging:
level: debug
selectors:
- publisher_pipeline_output
- kafka
queue.mem:
events: 4096
flush.timeout: 0s
output.kafka:
topic: %s
version: %s
hosts:
- %s
backoff:
init: 0.1s
max: 0.2s
`
)

// TestKafkaOutputCanConnectAndPublish ensures the beat Kafka output can successfully produce messages to Kafka.
// Regression test for https://github.com/elastic/beats/issues/41823 where the Kafka output would
// panic on the first Publish because it's Connect method was no longer called.
func TestKafkaOutputCanConnectAndPublish(t *testing.T) {
// Create a Mock Kafka broker that will listen on localhost on a random unallocated port.
// The reference configuration was taken from https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/async_producer_test.go#L141.
leader := sarama.NewMockBroker(t, 1)
defer leader.Close()

// The mock broker must respond to a single metadata request.
metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)

// The mock broker must return a single produce response. If no produce request is received, the test will fail.
// This guarantees that mockbeat successfully produced a message to Kafka and connectivity is established.
prodSuccess := new(sarama.ProduceResponse)
prodSuccess.AddTopicPartition(kafkaTopic, 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

// Start mockbeat with the appropriate configuration.
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr()))
mockbeat.Start()

// Wait for mockbeat to log that it successfully published a batch to Kafka.
// This ensures that mockbeat received the expected produce response configured above.
mockbeat.WaitForLogs(
`finished kafka batch`,
10*time.Second,
"did not find finished batch log")
}

0 comments on commit e42589d

Please sign in to comment.