Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Kafka input, json payload #26833

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Adding the option to use json payload as structured data on kafka inp…
…ut vs string with json
mjmbischoff committed Jul 11, 2021
commit 032dbac5c6d4a326e331e28e2458724333c8228c
4 changes: 4 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
@@ -168,6 +168,10 @@ Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.

===== `payload_type`

This configures how the input will handle the payload. Defaults to `"string"` which puts the payload into message field. Other option `json` will attempt to parse the payload and merge the structure with the top level of the event.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

15 changes: 15 additions & 0 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ type kafkaInputConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
PayloadType string `config:"payload_type"`
}

type kafkaFetch struct {
@@ -127,6 +128,7 @@ func defaultConfig() kafkaInputConfig {
MaxRetries: 4,
RetryBackoff: 2 * time.Second,
},
PayloadType: "string",
}
}

@@ -143,6 +145,10 @@ func (c *kafkaInputConfig) Validate() error {
if c.Username != "" && c.Password == "" {
return fmt.Errorf("password must be set when username is configured")
}

if !stringInSlice(c.PayloadType, []string{"string", "json"}) {
return fmt.Errorf("invalid value for payload_type: %s, supported values are: string, json", c.PayloadType)
}
return nil
}

@@ -272,3 +278,12 @@ func (is *isolationLevel) Unpack(value string) error {
*is = isolationLevel
return nil
}

func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
144 changes: 106 additions & 38 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
@@ -110,6 +110,7 @@ func (input *kafkaInput) runConsumerGroup(
outlet: input.outlet,
// expandEventListFromField will be assigned the configuration option expand_event_list_from_field
expandEventListFromField: input.config.ExpandEventListFromField,
payloadType: input.config.PayloadType,
log: input.log,
}

@@ -241,6 +242,7 @@ type groupHandler struct {
// if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned
// ex. in this case are the azure fielsets where the events are found under the json object "records"
expandEventListFromField string
payloadType string
log *logp.Logger
}

@@ -275,30 +277,15 @@ func (h *groupHandler) createEvents(
kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers)
}

// if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed
var events []beat.Event
var messages []string
if h.expandEventListFromField == "" {
messages = []string{string(message.Value)}
} else {
messages = h.parseMultipleMessages(message.Value)
switch h.payloadType {
case "string":
return h.parseStringPayload(message, timestamp, kafkaFields)
default:
return h.parseStringPayload(message, timestamp, kafkaFields)
case "json":
return h.parseJsonPayload(message, timestamp, kafkaFields)
}
for _, msg := range messages {
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": msg,
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)

}
return events
}

func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error {
@@ -335,24 +322,105 @@ func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sara
return nil
}

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string {
var obj map[string][]interface{}
err := json.Unmarshal(bMessage, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err)
return []string{}
func (h *groupHandler) parseStringPayload(message *sarama.ConsumerMessage, timestamp time.Time, kafkaFields common.MapStr) []beat.Event {
// if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed
if h.expandEventListFromField == "" {
return []beat.Event{
{
Timestamp: timestamp,
Fields: common.MapStr{
"message": string(message.Value),
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
},
}
} else {
// try to split the message into multiple ones based on the group field provided by the configuration
var obj map[string][]common.MapStr
err := json.Unmarshal(message.Value, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka deserializing multiple messages using the group object %s", h.expandEventListFromField), "error", err)
return []beat.Event{}
}
var events []beat.Event
if len(obj[h.expandEventListFromField]) > 0 {
for _, ms := range obj[h.expandEventListFromField] {
js, err := json.Marshal(ms)
if err == nil {
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": string(js),
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)
} else {
h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err)
}
}
}
return events
}
var messages []string
if len(obj[h.expandEventListFromField]) > 0 {
for _, ms := range obj[h.expandEventListFromField] {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
} else {
h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err)
}

func (h *groupHandler) parseJsonPayload(message *sarama.ConsumerMessage, timestamp time.Time, kafkaFields common.MapStr) []beat.Event {
// if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed
if h.expandEventListFromField == "" {
var obj common.MapStr
err := json.Unmarshal(message.Value, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka deserializing json payload failed"), "error", err)
return []beat.Event{}
}
var fields = common.MapStr{
"kafka": kafkaFields,
}
fields.Update(obj)
return []beat.Event{
{
Timestamp: timestamp,
Fields: fields,
Private: eventMeta{
handler: h,
message: message,
},
},
}
} else {
// try to split the message into multiple ones based on the group field provided by the configuration
var obj map[string][]common.MapStr
err := json.Unmarshal(message.Value, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka deserializing multiple messages using the group object %s", h.expandEventListFromField), "error", err)
return []beat.Event{}
}
var events []beat.Event
if len(obj[h.expandEventListFromField]) > 0 {
for _, ms := range obj[h.expandEventListFromField] {
var fields = common.MapStr{
"kafka": kafkaFields,
}
fields.Update(ms)
event := beat.Event{
Timestamp: timestamp,
Fields: fields,
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)
}
}
return events
}
return messages
}
147 changes: 147 additions & 0 deletions filebeat/input/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
@@ -256,6 +256,153 @@ func TestInputWithMultipleEvents(t *testing.T) {
}
}

func TestInputWithJsonPayload(t *testing.T) {
id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int())
testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id)
context := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}

// Send test message to the topic for the input to read.
message := testMessage{
message: "{\"val\":\"val1\"}",
headers: []sarama.RecordHeader{
recordHeader("X-Test-Header", "test header value"),
},
}
writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20)

// Setup the input config
config := common.MustNewConfigFrom(common.MapStr{
"hosts": getTestKafkaHost(),
"topics": []string{testTopic},
"group_id": "filebeat",
"wait_close": 0,
"payload_type": "json",
})

// Route input events through our capturer instead of sending through ES.
events := make(chan beat.Event, 100)
defer close(events)
capturer := NewEventCapturer(events)
defer capturer.Close()
connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(capturer), nil
})

input, err := NewInput(config, connector, context)
if err != nil {
t.Fatal(err)
}

// Run the input and wait for finalization
input.Run()

timeout := time.After(30 * time.Second)
select {
case event := <-events:
text, err := event.Fields.GetValue("val")
if err != nil {
t.Fatal(err)
}
msgs := []string{"val1"}
assert.Contains(t, msgs, text)
checkMatchingHeaders(t, event, message.headers)
case <-timeout:
t.Fatal("timeout waiting for incoming events")
}

// Close the done channel and make sure the beat shuts down in a reasonable
// amount of time.
close(context.Done)
didClose := make(chan struct{})
go func() {
input.Wait()
close(didClose)
}()

select {
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for beat to shut down")
case <-didClose:
}
}

func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) {
id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int())
testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id)
context := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}

// Send test messages to the topic for the input to read.
message := testMessage{
message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}",
headers: []sarama.RecordHeader{
recordHeader("X-Test-Header", "test header value"),
},
}
writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20)

// Setup the input config
config := common.MustNewConfigFrom(common.MapStr{
"hosts": getTestKafkaHost(),
"topics": []string{testTopic},
"group_id": "filebeat",
"wait_close": 0,
"expand_event_list_from_field": "records",
"payload_type": "json",
})

// Route input events through our capturer instead of sending through ES.
events := make(chan beat.Event, 100)
defer close(events)
capturer := NewEventCapturer(events)
defer capturer.Close()
connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(capturer), nil
})

input, err := NewInput(config, connector, context)
if err != nil {
t.Fatal(err)
}

// Run the input and wait for finalization
input.Run()

timeout := time.After(30 * time.Second)
select {
case event := <-events:
text, err := event.Fields.GetValue("val")
if err != nil {
t.Fatal(err)
}
msgs := []string{"val1", "val2"}
assert.Contains(t, msgs, text)
checkMatchingHeaders(t, event, message.headers)
case <-timeout:
t.Fatal("timeout waiting for incoming events")
}

// Close the done channel and make sure the beat shuts down in a reasonable
// amount of time.
close(context.Done)
didClose := make(chan struct{})
go func() {
input.Wait()
close(didClose)
}()

select {
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for beat to shut down")
case <-didClose:
}
}

func findMessage(t *testing.T, text string, msgs []testMessage) *testMessage {
var msg *testMessage
for _, m := range msgs {