Skip to content

Commit

Permalink
[libbeat] kafka message header support (#29940)
Browse files Browse the repository at this point in the history
Now it's possible to configure headers that will be attached to each message in the Kafka output.

Co-authored-by: Denis Rechkunov <[email protected]>
  • Loading branch information
herbxu and rdner authored Feb 12, 2022
1 parent 06c810c commit 1bba206
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Add `script` processor to all beats {issue}29269[29269] {pull}29752[29752]
- Only connect to Elasticsearch instances with the same version or newer. {pull}29683[29683]
- Move umask from code to service files. {pull}29708[29708]
- Add support for kafka message headers. {pull}29940[29940]
- Add FIPS configuration option for all AWS API calls. {pull}[28899]
- Add metadata change support for some processors {pull}30183[30183]

Expand Down
17 changes: 17 additions & 0 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type client struct {

producer sarama.AsyncProducer

recordHeaders []sarama.RecordHeader

wg sync.WaitGroup
}

Expand All @@ -76,6 +78,7 @@ func newKafkaClient(
index string,
key *fmtstr.EventFormatString,
topic outil.Selector,
headers map[string]string,
writer codec.Codec,
cfg *sarama.Config,
) (*client, error) {
Expand All @@ -90,6 +93,20 @@ func newKafkaClient(
config: *cfg,
done: make(chan struct{}),
}

if len(headers) != 0 {
recordHeaders := make([]sarama.RecordHeader, 0, len(headers))
for k, v := range headers {
recordHeader := sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
}

recordHeaders = append(recordHeaders, recordHeader)
}
c.recordHeaders = recordHeaders
}

return c, nil
}

Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type kafkaConfig struct {
BulkMaxSize int `config:"bulk_max_size"`
BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
Headers map[string]string `config:"headers"`
Backoff backoffConfig `config:"backoff"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Expand Down Expand Up @@ -125,6 +126,7 @@ func defaultConfig() kafkaConfig {
CompressionLevel: 4,
Version: kafka.Version("1.0.0"),
MaxRetries: 3,
Headers: nil,
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 60 * time.Second,
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func makeKafka(
return outputs.Fail(err)
}

client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, codec, libCfg)
client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg)
if err != nil {
return outputs.Fail(err)
}
Expand Down
17 changes: 17 additions & 0 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ func TestKafkaPublish(t *testing.T) {
"message": id,
}),
},
{
"publish message with kafka headers to test topic",
map[string]interface{}{
"headers": map[string]string{
"app": "test-app",
"host": "test-host",
},
},
testTopic,
randMulti(5, 100, common.MapStr{
"host": "test-host",
}),
},
}

defaultConfig := map[string]interface{}{
Expand Down Expand Up @@ -279,6 +292,10 @@ func TestKafkaPublish(t *testing.T) {

seenMsgs := map[string]struct{}{}
for _, s := range stored {
if headers, exists := test.config["headers"]; exists {
assert.Equal(t, len(headers.(map[string]string)), len(s.Headers))
}

msg := validate(t, s.Value, expected)
seenMsgs[msg] = struct{}{}
}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/kafka/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ func (m *message) initProducerMessage() {
Value: sarama.ByteEncoder(m.value),
Timestamp: m.ts,
}

if m.ref != nil {
m.msg.Headers = m.ref.client.recordHeaders
}
}

0 comments on commit 1bba206

Please sign in to comment.