Skip to content

Commit

Permalink
[7.17](backport #30277) Fix Kafka input for filebeat (#30307)
Browse files Browse the repository at this point in the history
* Fix Kafka input for filebeat (#30277)

The Kafka input was broken and had 2 issues:

A serialization error on filebeat output:

Looks like the ack function was put in the `reader.Message.Meta` map by mistake in
20d6038
`Meta` is a `MapStr` type that does not support function values, therefore fails to
serialize itself when requested later on output.

`ack` was not called for incoming messages:

The ack function was never used because it was supposed
to be a part of the `beat.Event.Private` and it was not put in there.

(cherry picked from commit c2f51de)

* Fix missing struct field

* Add a delay for sarama to commit

* Add support for empty partitions

Co-authored-by: Denis Rechkunov <[email protected]>
  • Loading branch information
mergify[bot] and rdner authored Feb 10, 2022
1 parent d338c2b commit bd7067b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d


*Filebeat*

- Fix broken Kafka input {issue}29746[29746] {pull}30277[30277]

*Heartbeat*

Expand Down
5 changes: 3 additions & 2 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s
Timestamp: message.Ts,
Meta: message.Meta,
Fields: message.Fields,
Private: message.Private,
})
}
return nil
Expand Down Expand Up @@ -458,8 +459,8 @@ func composeMessage(timestamp time.Time, content []byte, kafkaFields common.MapS
"kafka": kafkaFields,
"message": string(content),
},
Meta: common.MapStr{
"ackHandler": ackHandler,
Private: eventMeta{
ackHandler: ackHandler,
},
}
}
Expand Down
47 changes: 46 additions & 1 deletion filebeat/input/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func recordHeader(key, value string) sarama.RecordHeader {

func TestInput(t *testing.T) {
testTopic := createTestTopicName()
groupID := "filebeat"

// Send test messages to the topic for the input to read.
messages := []testMessage{
Expand All @@ -88,7 +89,7 @@ func TestInput(t *testing.T) {
config := common.MustNewConfigFrom(common.MapStr{
"hosts": getTestKafkaHost(),
"topics": []string{testTopic},
"group_id": "filebeat",
"group_id": groupID,
"wait_close": 0,
})

Expand All @@ -113,11 +114,22 @@ func TestInput(t *testing.T) {
assert.Equal(t, text, msg.message)

checkMatchingHeaders(t, event, msg.headers)

// emulating the pipeline (kafkaInput.Run)
meta, ok := event.Private.(eventMeta)
if !ok {
t.Fatal("could not get eventMeta and ack the message")
}
meta.ackHandler()
case <-timeout:
t.Fatal("timeout waiting for incoming events")
}
}

// sarama commits every second, we need to make sure
// all message acks are committed before the rest of the checks
<-time.After(2 * time.Second)

// Close the done channel and make sure the beat shuts down in a reasonable
// amount of time.
cancel()
Expand All @@ -132,6 +144,8 @@ func TestInput(t *testing.T) {
t.Fatal("timeout waiting for beat to shut down")
case <-didClose:
}

assertOffset(t, groupID, testTopic, int64(len(messages)))
}

func TestInputWithMultipleEvents(t *testing.T) {
Expand Down Expand Up @@ -420,6 +434,37 @@ func getTestKafkaHost() string {
)
}

func assertOffset(t *testing.T, groupID, topic string, expected int64) {
client, err := sarama.NewClient([]string{getTestKafkaHost()}, nil)
assert.NoError(t, err)
defer client.Close()

ofm, err := sarama.NewOffsetManagerFromClient(groupID, client)
assert.NoError(t, err)
defer ofm.Close()

partitions, err := client.Partitions(topic)
assert.NoError(t, err)

var offsetSum int64

for _, partitionID := range partitions {
pom, err := ofm.ManagePartition(topic, partitionID)
assert.NoError(t, err)

offset, _ := pom.NextOffset()
// if the partition was not written to before
// it might return -1 which would affect the sum
if offset > 0 {
offsetSum += offset
}

pom.Close()
}

assert.Equal(t, expected, offsetSum, "offset does not match, perhaps messages were not acknowledged")
}

func writeToKafkaTopic(
t *testing.T, topic string, message string,
headers []sarama.RecordHeader, timeout time.Duration,
Expand Down
1 change: 1 addition & 0 deletions libbeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Message struct {
Bytes int // total number of bytes read to generate the message
Fields common.MapStr // optional fields that can be added by reader
Meta common.MapStr
Private interface{}
}

// IsEmpty returns true in case the message is empty
Expand Down

0 comments on commit bd7067b

Please sign in to comment.