Skip to content

Commit

Permalink
Fix Kafka input for filebeat (#30277)
Browse files Browse the repository at this point in the history
The Kafka input was broken and had 2 issues:

A serialization error on filebeat output:

Looks like the ack function was put in the `reader.Message.Meta` map by mistake in
20d6038
`Meta` is a `MapStr` type that does not support function values, therefore fails to
serialize itself when requested later on output.

`ack` was not called for incoming messages:

The ack function was never used because it was supposed
to be a part of the `beat.Event.Private` and it was not put in there.

(cherry picked from commit c2f51de)
  • Loading branch information
rdner authored and mergify-bot committed Feb 9, 2022
1 parent ff0e3b6 commit d50cdb7
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Auditbeat*

- auditd: Add error.message to events when processing fails. {pull}30009[30009]

*Filebeat*

- tcp/unix input: Stop accepting connections after socket is closed. {pull}29712[29712]
- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695]
- aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131]
- Fix broken Kafka input {issue}29746[29746] {pull}30277[30277]

*Heartbeat*


*Filebeat*

Expand Down
5 changes: 3 additions & 2 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s
Timestamp: message.Ts,
Meta: message.Meta,
Fields: message.Fields,
Private: message.Private,
})
}
return nil
Expand Down Expand Up @@ -458,8 +459,8 @@ func composeMessage(timestamp time.Time, content []byte, kafkaFields common.MapS
"kafka": kafkaFields,
"message": string(content),
},
Meta: common.MapStr{
"ackHandler": ackHandler,
Private: eventMeta{
ackHandler: ackHandler,
},
}
}
Expand Down
39 changes: 38 additions & 1 deletion filebeat/input/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func recordHeader(key, value string) sarama.RecordHeader {

func TestInput(t *testing.T) {
testTopic := createTestTopicName()
groupID := "filebeat"

// Send test messages to the topic for the input to read.
messages := []testMessage{
Expand All @@ -88,7 +89,7 @@ func TestInput(t *testing.T) {
config := common.MustNewConfigFrom(common.MapStr{
"hosts": getTestKafkaHost(),
"topics": []string{testTopic},
"group_id": "filebeat",
"group_id": groupID,
"wait_close": 0,
})

Expand All @@ -113,6 +114,13 @@ func TestInput(t *testing.T) {
assert.Equal(t, text, msg.message)

checkMatchingHeaders(t, event, msg.headers)

// emulating the pipeline (kafkaInput.Run)
meta, ok := event.Private.(eventMeta)
if !ok {
t.Fatal("could not get eventMeta and ack the message")
}
meta.ackHandler()
case <-timeout:
t.Fatal("timeout waiting for incoming events")
}
Expand All @@ -132,6 +140,8 @@ func TestInput(t *testing.T) {
t.Fatal("timeout waiting for beat to shut down")
case <-didClose:
}

assertOffset(t, groupID, testTopic, int64(len(messages)))
}

func TestInputWithMultipleEvents(t *testing.T) {
Expand Down Expand Up @@ -420,6 +430,33 @@ func getTestKafkaHost() string {
)
}

func assertOffset(t *testing.T, groupID, topic string, expected int64) {
client, err := sarama.NewClient([]string{getTestKafkaHost()}, nil)
assert.NoError(t, err)
defer client.Close()

ofm, err := sarama.NewOffsetManagerFromClient(groupID, client)
assert.NoError(t, err)
defer ofm.Close()

partitions, err := client.Partitions(topic)
assert.NoError(t, err)

var offsetSum int64

for _, partitionID := range partitions {
pom, err := ofm.ManagePartition(topic, partitionID)
assert.NoError(t, err)

offset, _ := pom.NextOffset()
offsetSum += offset

pom.Close()
}

assert.Equal(t, expected, offsetSum, "offset does not match, perhaps messages were not acknowledged")
}

func writeToKafkaTopic(
t *testing.T, topic string, message string,
headers []sarama.RecordHeader, timeout time.Duration,
Expand Down

0 comments on commit d50cdb7

Please sign in to comment.