From 032dbac5c6d4a326e331e28e2458724333c8228c Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Sun, 11 Jul 2021 03:02:03 +0200 Subject: [PATCH 1/2] Adding the option to use json payload as structured data on kafka input vs string with json --- filebeat/docs/inputs/input-kafka.asciidoc | 4 + filebeat/input/kafka/config.go | 15 ++ filebeat/input/kafka/input.go | 144 ++++++++++++----- .../input/kafka/kafka_integration_test.go | 147 ++++++++++++++++++ 4 files changed, 272 insertions(+), 38 deletions(-) diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index e134fb86404..c3025d5cb41 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -168,6 +168,10 @@ Configuration options for Kerberos authentication. See <> for more information. +===== `payload_type` + +This configures how the input will handle the payload. Defaults to `"string"` which puts the payload into message field. Other option `json` will attempt to parse the payload and merge the structure with the top level of the event. + [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index c69b2522a4f..727f7ff3252 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -53,6 +53,7 @@ type kafkaInputConfig struct { Username string `config:"username"` Password string `config:"password"` ExpandEventListFromField string `config:"expand_event_list_from_field"` + PayloadType string `config:"payload_type"` } type kafkaFetch struct { @@ -127,6 +128,7 @@ func defaultConfig() kafkaInputConfig { MaxRetries: 4, RetryBackoff: 2 * time.Second, }, + PayloadType: "string", } } @@ -143,6 +145,10 @@ func (c *kafkaInputConfig) Validate() error { if c.Username != "" && c.Password == "" { return fmt.Errorf("password must be set when username is configured") } + + if !stringInSlice(c.PayloadType, []string{"string", "json"}) { + return fmt.Errorf("invalid value for payload_type: %s, supported values are: string, json", c.PayloadType) + } return nil } @@ -272,3 +278,12 @@ func (is *isolationLevel) Unpack(value string) error { *is = isolationLevel return nil } + +func stringInSlice(str string, list []string) bool { + for _, v := range list { + if v == str { + return true + } + } + return false +} diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index add3c664224..d58d2b6b537 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -110,6 +110,7 @@ func (input *kafkaInput) runConsumerGroup( outlet: input.outlet, // expandEventListFromField will be assigned the configuration option expand_event_list_from_field expandEventListFromField: input.config.ExpandEventListFromField, + payloadType: input.config.PayloadType, log: input.log, } @@ -241,6 +242,7 @@ type groupHandler struct { // if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned // ex. in this case are the azure fielsets where the events are found under the json object "records" expandEventListFromField string + payloadType string log *logp.Logger } @@ -275,30 +277,15 @@ func (h *groupHandler) createEvents( kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers) } - // if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed - var events []beat.Event - var messages []string - if h.expandEventListFromField == "" { - messages = []string{string(message.Value)} - } else { - messages = h.parseMultipleMessages(message.Value) + switch h.payloadType { + case "string": + return h.parseStringPayload(message, timestamp, kafkaFields) + default: + return h.parseStringPayload(message, timestamp, kafkaFields) + case "json": + return h.parseJsonPayload(message, timestamp, kafkaFields) } - for _, msg := range messages { - event := beat.Event{ - Timestamp: timestamp, - Fields: common.MapStr{ - "message": msg, - "kafka": kafkaFields, - }, - Private: eventMeta{ - handler: h, - message: message, - }, - } - events = append(events, event) - } - return events } func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error { @@ -335,24 +322,105 @@ func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sara return nil } -// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration -func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string { - var obj map[string][]interface{} - err := json.Unmarshal(bMessage, &obj) - if err != nil { - h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err) - return []string{} +func (h *groupHandler) parseStringPayload(message *sarama.ConsumerMessage, timestamp time.Time, kafkaFields common.MapStr) []beat.Event { + // if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed + if h.expandEventListFromField == "" { + return []beat.Event{ + { + Timestamp: timestamp, + Fields: common.MapStr{ + "message": string(message.Value), + "kafka": kafkaFields, + }, + Private: eventMeta{ + handler: h, + message: message, + }, + }, + } + } else { + // try to split the message into multiple ones based on the group field provided by the configuration + var obj map[string][]common.MapStr + err := json.Unmarshal(message.Value, &obj) + if err != nil { + h.log.Errorw(fmt.Sprintf("Kafka deserializing multiple messages using the group object %s", h.expandEventListFromField), "error", err) + return []beat.Event{} + } + var events []beat.Event + if len(obj[h.expandEventListFromField]) > 0 { + for _, ms := range obj[h.expandEventListFromField] { + js, err := json.Marshal(ms) + if err == nil { + event := beat.Event{ + Timestamp: timestamp, + Fields: common.MapStr{ + "message": string(js), + "kafka": kafkaFields, + }, + Private: eventMeta{ + handler: h, + message: message, + }, + } + events = append(events, event) + } else { + h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) + } + } + } + return events } - var messages []string - if len(obj[h.expandEventListFromField]) > 0 { - for _, ms := range obj[h.expandEventListFromField] { - js, err := json.Marshal(ms) - if err == nil { - messages = append(messages, string(js)) - } else { - h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) +} + +func (h *groupHandler) parseJsonPayload(message *sarama.ConsumerMessage, timestamp time.Time, kafkaFields common.MapStr) []beat.Event { + // if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed + if h.expandEventListFromField == "" { + var obj common.MapStr + err := json.Unmarshal(message.Value, &obj) + if err != nil { + h.log.Errorw(fmt.Sprintf("Kafka deserializing json payload failed"), "error", err) + return []beat.Event{} + } + var fields = common.MapStr{ + "kafka": kafkaFields, + } + fields.Update(obj) + return []beat.Event{ + { + Timestamp: timestamp, + Fields: fields, + Private: eventMeta{ + handler: h, + message: message, + }, + }, + } + } else { + // try to split the message into multiple ones based on the group field provided by the configuration + var obj map[string][]common.MapStr + err := json.Unmarshal(message.Value, &obj) + if err != nil { + h.log.Errorw(fmt.Sprintf("Kafka deserializing multiple messages using the group object %s", h.expandEventListFromField), "error", err) + return []beat.Event{} + } + var events []beat.Event + if len(obj[h.expandEventListFromField]) > 0 { + for _, ms := range obj[h.expandEventListFromField] { + var fields = common.MapStr{ + "kafka": kafkaFields, + } + fields.Update(ms) + event := beat.Event{ + Timestamp: timestamp, + Fields: fields, + Private: eventMeta{ + handler: h, + message: message, + }, + } + events = append(events, event) } } + return events } - return messages } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index c7ad9fc999c..1e66719455e 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -256,6 +256,153 @@ func TestInputWithMultipleEvents(t *testing.T) { } } +func TestInputWithJsonPayload(t *testing.T) { + id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) + testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) + context := input.Context{ + Done: make(chan struct{}), + BeatDone: make(chan struct{}), + } + + // Send test message to the topic for the input to read. + message := testMessage{ + message: "{\"val\":\"val1\"}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "payload_type": "json", + }) + + // Route input events through our capturer instead of sending through ES. + events := make(chan beat.Event, 100) + defer close(events) + capturer := NewEventCapturer(events) + defer capturer.Close() + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + return channel.SubOutlet(capturer), nil + }) + + input, err := NewInput(config, connector, context) + if err != nil { + t.Fatal(err) + } + + // Run the input and wait for finalization + input.Run() + + timeout := time.After(30 * time.Second) + select { + case event := <-events: + text, err := event.Fields.GetValue("val") + if err != nil { + t.Fatal(err) + } + msgs := []string{"val1"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") + } + + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + close(context.Done) + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } +} + +func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) { + id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) + testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) + context := input.Context{ + Done: make(chan struct{}), + BeatDone: make(chan struct{}), + } + + // Send test messages to the topic for the input to read. + message := testMessage{ + message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "expand_event_list_from_field": "records", + "payload_type": "json", + }) + + // Route input events through our capturer instead of sending through ES. + events := make(chan beat.Event, 100) + defer close(events) + capturer := NewEventCapturer(events) + defer capturer.Close() + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + return channel.SubOutlet(capturer), nil + }) + + input, err := NewInput(config, connector, context) + if err != nil { + t.Fatal(err) + } + + // Run the input and wait for finalization + input.Run() + + timeout := time.After(30 * time.Second) + select { + case event := <-events: + text, err := event.Fields.GetValue("val") + if err != nil { + t.Fatal(err) + } + msgs := []string{"val1", "val2"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") + } + + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + close(context.Done) + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } +} + func findMessage(t *testing.T, text string, msgs []testMessage) *testMessage { var msg *testMessage for _, m := range msgs { From ad02f7b9638a742a21d4e2547abe85b591db6cd7 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Sun, 11 Jul 2021 03:08:29 +0200 Subject: [PATCH 2/2] Improving test to ensure we get at least the number of messages we're expecting. --- .../input/kafka/kafka_integration_test.go | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 1e66719455e..1f11c2da7d8 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -227,17 +227,19 @@ func TestInputWithMultipleEvents(t *testing.T) { input.Run() timeout := time.After(30 * time.Second) - select { - case event := <-events: - text, err := event.Fields.GetValue("message") - if err != nil { - t.Fatal(err) + for i := 0; i < 2; i++ { + select { + case event := <-events: + text, err := event.Fields.GetValue("message") + if err != nil { + t.Fatal(err) + } + msgs := []string{"{\"val\":\"val1\"}", "{\"val\":\"val2\"}"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") } - msgs := []string{"{\"val\":\"val1\"}", "{\"val\":\"val2\"}"} - assert.Contains(t, msgs, text) - checkMatchingHeaders(t, event, message.headers) - case <-timeout: - t.Fatal("timeout waiting for incoming events") } // Close the done channel and make sure the beat shuts down in a reasonable @@ -374,17 +376,19 @@ func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) { input.Run() timeout := time.After(30 * time.Second) - select { - case event := <-events: - text, err := event.Fields.GetValue("val") - if err != nil { - t.Fatal(err) + for i := 0; i < 2; i++ { + select { + case event := <-events: + text, err := event.Fields.GetValue("val") + if err != nil { + t.Fatal(err) + } + msgs := []string{"val1", "val2"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") } - msgs := []string{"val1", "val2"} - assert.Contains(t, msgs, text) - checkMatchingHeaders(t, event, message.headers) - case <-timeout: - t.Fatal("timeout waiting for incoming events") } // Close the done channel and make sure the beat shuts down in a reasonable