diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 285cfce9026..12e9a4e0092 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -91,6 +91,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Add `script` processor to all beats {issue}29269[29269] {pull}29752[29752] - Only connect to Elasticsearch instances with the same version or newer. {pull}29683[29683] - Move umask from code to service files. {pull}29708[29708] +- Add support for kafka message headers. {pull}29940[29940] - Add FIPS configuration option for all AWS API calls. {pull}[28899] - Add metadata change support for some processors {pull}30183[30183] diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index c505e7d1824..149067cde34 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -53,6 +53,8 @@ type client struct { producer sarama.AsyncProducer + recordHeaders []sarama.RecordHeader + wg sync.WaitGroup } @@ -76,6 +78,7 @@ func newKafkaClient( index string, key *fmtstr.EventFormatString, topic outil.Selector, + headers map[string]string, writer codec.Codec, cfg *sarama.Config, ) (*client, error) { @@ -90,6 +93,20 @@ func newKafkaClient( config: *cfg, done: make(chan struct{}), } + + if len(headers) != 0 { + recordHeaders := make([]sarama.RecordHeader, 0, len(headers)) + for k, v := range headers { + recordHeader := sarama.RecordHeader{ + Key: []byte(k), + Value: []byte(v), + } + + recordHeaders = append(recordHeaders, recordHeader) + } + c.recordHeaders = recordHeaders + } + return c, nil } diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index fabf0989958..0d3d483840a 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -62,6 +62,7 @@ type kafkaConfig struct { BulkMaxSize int `config:"bulk_max_size"` BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` + Headers map[string]string `config:"headers"` Backoff backoffConfig `config:"backoff"` ClientID string `config:"client_id"` ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` @@ -125,6 +126,7 @@ func defaultConfig() kafkaConfig { CompressionLevel: 4, Version: kafka.Version("1.0.0"), MaxRetries: 3, + Headers: nil, Backoff: backoffConfig{ Init: 1 * time.Second, Max: 60 * time.Second, diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 8f06398eb0c..e957e7c8359 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -72,7 +72,7 @@ func makeKafka( return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 2be42f639e7..412451a3816 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -215,6 +215,19 @@ func TestKafkaPublish(t *testing.T) { "message": id, }), }, + { + "publish message with kafka headers to test topic", + map[string]interface{}{ + "headers": map[string]string{ + "app": "test-app", + "host": "test-host", + }, + }, + testTopic, + randMulti(5, 100, common.MapStr{ + "host": "test-host", + }), + }, } defaultConfig := map[string]interface{}{ @@ -279,6 +292,10 @@ func TestKafkaPublish(t *testing.T) { seenMsgs := map[string]struct{}{} for _, s := range stored { + if headers, exists := test.config["headers"]; exists { + assert.Equal(t, len(headers.(map[string]string)), len(s.Headers)) + } + msg := validate(t, s.Value, expected) seenMsgs[msg] = struct{}{} } diff --git a/libbeat/outputs/kafka/message.go b/libbeat/outputs/kafka/message.go index 16f169b8bd1..bf77f32ac25 100644 --- a/libbeat/outputs/kafka/message.go +++ b/libbeat/outputs/kafka/message.go @@ -50,4 +50,8 @@ func (m *message) initProducerMessage() { Value: sarama.ByteEncoder(m.value), Timestamp: m.ts, } + + if m.ref != nil { + m.msg.Headers = m.ref.client.recordHeaders + } }