From c8b6cc795ab4fce5a86f1dc2436152084901cabe Mon Sep 17 00:00:00 2001 From: bianzhu Date: Fri, 21 Jan 2022 13:11:46 +0800 Subject: [PATCH 1/5] kafka message support headers feature --- libbeat/outputs/kafka/client.go | 19 ++++++++++++++++++- libbeat/outputs/kafka/config.go | 2 ++ libbeat/outputs/kafka/kafka.go | 2 +- libbeat/outputs/kafka/message.go | 1 + 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index c505e7d1824f..263ae05df230 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -53,6 +53,8 @@ type client struct { producer sarama.AsyncProducer + recordHeaders []sarama.RecordHeader + wg sync.WaitGroup } @@ -76,7 +78,8 @@ func newKafkaClient( index string, key *fmtstr.EventFormatString, topic outil.Selector, - writer codec.Codec, + headers map[string]string, + writer codec.Codec, cfg *sarama.Config, ) (*client, error) { c := &client{ @@ -90,6 +93,20 @@ func newKafkaClient( config: *cfg, done: make(chan struct{}), } + + if len(headers) != 0 { + recordHeaders := make([]sarama.RecordHeader, 0) + for k, v := range headers { + recordHeader := sarama.RecordHeader{ + Key: []byte(k), + Value: []byte(v), + } + + recordHeaders = append(recordHeaders, recordHeader) + } + c.recordHeaders = recordHeaders + } + return c, nil } diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index fabf09899583..0d3d483840a9 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -62,6 +62,7 @@ type kafkaConfig struct { BulkMaxSize int `config:"bulk_max_size"` BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` + Headers map[string]string `config:"headers"` Backoff backoffConfig `config:"backoff"` ClientID string `config:"client_id"` ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` @@ -125,6 +126,7 @@ func defaultConfig() kafkaConfig { CompressionLevel: 4, Version: kafka.Version("1.0.0"), MaxRetries: 3, + Headers: nil, Backoff: backoffConfig{ Init: 1 * time.Second, Max: 60 * time.Second, diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 8f06398eb0c3..e957e7c83598 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -72,7 +72,7 @@ func makeKafka( return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/kafka/message.go b/libbeat/outputs/kafka/message.go index 16f169b8bd19..c05176c9abe7 100644 --- a/libbeat/outputs/kafka/message.go +++ b/libbeat/outputs/kafka/message.go @@ -49,5 +49,6 @@ func (m *message) initProducerMessage() { Key: sarama.ByteEncoder(m.key), Value: sarama.ByteEncoder(m.value), Timestamp: m.ts, + Headers : m.ref.client.recordHeaders, } } From 25733b28b5f91b2ab79c3d4d0f236b4cd6ad26e8 Mon Sep 17 00:00:00 2001 From: bianzhu Date: Fri, 21 Jan 2022 13:22:18 +0800 Subject: [PATCH 2/5] add kafka message headers to CHANGELOG --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 101613c4f27d..2ad6aa8f0c0e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -153,6 +153,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for latest k8s versions v1.23 and v1.22 {pull}29575[29575] - Only connect to Elasticsearch instances with the same version or newer. {pull}29683[29683] - Move umask from code to service files. {pull}29708[29708] +- Add support for kafka message headers. {pull}29940[29940] *Auditbeat* From 92545b10dd99fd98b8ed21c223ee05f243c02420 Mon Sep 17 00:00:00 2001 From: bianzhu Date: Thu, 10 Feb 2022 14:35:07 +0800 Subject: [PATCH 3/5] add integration test for kafka header feature --- libbeat/outputs/kafka/client.go | 4 ++-- .../outputs/kafka/kafka_integration_test.go | 21 ++++++++++++++++++- libbeat/outputs/kafka/message.go | 5 ++++- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 263ae05df230..5377d422843a 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -79,7 +79,7 @@ func newKafkaClient( key *fmtstr.EventFormatString, topic outil.Selector, headers map[string]string, - writer codec.Codec, + writer codec.Codec, cfg *sarama.Config, ) (*client, error) { c := &client{ @@ -106,7 +106,7 @@ func newKafkaClient( } c.recordHeaders = recordHeaders } - + return c, nil } diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 2be42f639e79..6004d5a522e7 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -215,6 +215,19 @@ func TestKafkaPublish(t *testing.T) { "message": id, }), }, + { + "publish message with kafka headers to test topic", + map[string]interface{}{ + "headers": map[string]string{ + "app": "test-app", + "host": "test-host", + }, + }, + testTopic, + randMulti(5, 100, common.MapStr{ + "host": "test-host", + }), + }, } defaultConfig := map[string]interface{}{ @@ -279,6 +292,10 @@ func TestKafkaPublish(t *testing.T) { seenMsgs := map[string]struct{}{} for _, s := range stored { + if headers, exists := test.config["headers"]; exists { + assert.Equal(t, len(headers.(map[string]string)), len(s.Headers)) + } + msg := validate(t, s.Value, expected) seenMsgs[msg] = struct{}{} } @@ -371,7 +388,9 @@ func makeConfig(t *testing.T, in map[string]interface{}) *common.Config { func newTestConsumer(t *testing.T) sarama.Consumer { hosts := []string{getTestKafkaHost()} - consumer, err := sarama.NewConsumer(hosts, nil) + cfg := sarama.NewConfig() + cfg.Version = sarama.V1_0_0_0 + consumer, err := sarama.NewConsumer(hosts, cfg) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/kafka/message.go b/libbeat/outputs/kafka/message.go index c05176c9abe7..bf77f32ac25d 100644 --- a/libbeat/outputs/kafka/message.go +++ b/libbeat/outputs/kafka/message.go @@ -49,6 +49,9 @@ func (m *message) initProducerMessage() { Key: sarama.ByteEncoder(m.key), Value: sarama.ByteEncoder(m.value), Timestamp: m.ts, - Headers : m.ref.client.recordHeaders, + } + + if m.ref != nil { + m.msg.Headers = m.ref.client.recordHeaders } } From eb1aea1dcca14c957dc3ad3b668974ffa454bd8f Mon Sep 17 00:00:00 2001 From: bianzhu Date: Thu, 10 Feb 2022 15:10:32 +0800 Subject: [PATCH 4/5] remove sarama cfg with version --- libbeat/outputs/kafka/kafka_integration_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 6004d5a522e7..412451a38163 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -388,9 +388,7 @@ func makeConfig(t *testing.T, in map[string]interface{}) *common.Config { func newTestConsumer(t *testing.T) sarama.Consumer { hosts := []string{getTestKafkaHost()} - cfg := sarama.NewConfig() - cfg.Version = sarama.V1_0_0_0 - consumer, err := sarama.NewConsumer(hosts, cfg) + consumer, err := sarama.NewConsumer(hosts, nil) if err != nil { t.Fatal(err) } From e90c2c1c5c89ece9c7609251c7aabfa5b04335a2 Mon Sep 17 00:00:00 2001 From: herbxu <58198659+herbxu@users.noreply.github.com> Date: Fri, 11 Feb 2022 20:37:35 +0800 Subject: [PATCH 5/5] Update libbeat/outputs/kafka/client.go Accepted this pre-allocate memory for kafka headers buffer Co-authored-by: Denis Rechkunov --- libbeat/outputs/kafka/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 5377d422843a..149067cde340 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -95,7 +95,7 @@ func newKafkaClient( } if len(headers) != 0 { - recordHeaders := make([]sarama.RecordHeader, 0) + recordHeaders := make([]sarama.RecordHeader, 0, len(headers)) for k, v := range headers { recordHeader := sarama.RecordHeader{ Key: []byte(k),