diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 285cfce9026..6d1f1df87c5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -49,6 +49,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - tcp/unix input: Stop accepting connections after socket is closed. {pull}29712[29712] - Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695] - aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131] +- Fix broken Kafka input {issue}29746[29746] {pull}30277[30277] *Heartbeat* diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 9c872f1a14b..6440dcf9d2a 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -318,6 +318,7 @@ func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s Timestamp: message.Ts, Meta: message.Meta, Fields: message.Fields, + Private: message.Private, }) } return nil @@ -458,8 +459,8 @@ func composeMessage(timestamp time.Time, content []byte, kafkaFields common.MapS "kafka": kafkaFields, "message": string(content), }, - Meta: common.MapStr{ - "ackHandler": ackHandler, + Private: eventMeta{ + ackHandler: ackHandler, }, } } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index cb3f3526c99..58b5c89ec2f 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -62,6 +62,7 @@ func recordHeader(key, value string) sarama.RecordHeader { func TestInput(t *testing.T) { testTopic := createTestTopicName() + groupID := "filebeat" // Send test messages to the topic for the input to read. messages := []testMessage{ @@ -88,7 +89,7 @@ func TestInput(t *testing.T) { config := common.MustNewConfigFrom(common.MapStr{ "hosts": getTestKafkaHost(), "topics": []string{testTopic}, - "group_id": "filebeat", + "group_id": groupID, "wait_close": 0, }) @@ -113,6 +114,13 @@ func TestInput(t *testing.T) { assert.Equal(t, text, msg.message) checkMatchingHeaders(t, event, msg.headers) + + // emulating the pipeline (kafkaInput.Run) + meta, ok := event.Private.(eventMeta) + if !ok { + t.Fatal("could not get eventMeta and ack the message") + } + meta.ackHandler() case <-timeout: t.Fatal("timeout waiting for incoming events") } @@ -132,6 +140,8 @@ func TestInput(t *testing.T) { t.Fatal("timeout waiting for beat to shut down") case <-didClose: } + + assertOffset(t, groupID, testTopic, int64(len(messages))) } func TestInputWithMultipleEvents(t *testing.T) { @@ -420,6 +430,33 @@ func getTestKafkaHost() string { ) } +func assertOffset(t *testing.T, groupID, topic string, expected int64) { + client, err := sarama.NewClient([]string{getTestKafkaHost()}, nil) + assert.NoError(t, err) + defer client.Close() + + ofm, err := sarama.NewOffsetManagerFromClient(groupID, client) + assert.NoError(t, err) + defer ofm.Close() + + partitions, err := client.Partitions(topic) + assert.NoError(t, err) + + var offsetSum int64 + + for _, partitionID := range partitions { + pom, err := ofm.ManagePartition(topic, partitionID) + assert.NoError(t, err) + + offset, _ := pom.NextOffset() + offsetSum += offset + + pom.Close() + } + + assert.Equal(t, expected, offsetSum, "offset does not match, perhaps messages were not acknowledged") +} + func writeToKafkaTopic( t *testing.T, topic string, message string, headers []sarama.RecordHeader, timeout time.Duration,