From f5f52f4f6796c81040cfe8a9e3db2660ff6c02d7 Mon Sep 17 00:00:00 2001 From: Mariana Date: Tue, 8 Oct 2019 16:55:14 +0200 Subject: [PATCH 1/8] Modify kafka input to include parsing multiple messages --- filebeat/input/kafka/config.go | 33 ++++++++-------- filebeat/input/kafka/input.go | 69 ++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 31 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index ddc505bf4c7..e563248bddf 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -34,22 +34,23 @@ import ( type kafkaInputConfig struct { // Kafka hosts with port, e.g. "localhost:9092" - Hosts []string `config:"hosts" validate:"required"` - Topics []string `config:"topics" validate:"required"` - GroupID string `config:"group_id" validate:"required"` - ClientID string `config:"client_id"` - Version kafka.Version `config:"version"` - InitialOffset initialOffset `config:"initial_offset"` - ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` - ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` - WaitClose time.Duration `config:"wait_close" validate:"min=0"` - MaxWaitTime time.Duration `config:"max_wait_time"` - IsolationLevel isolationLevel `config:"isolation_level"` - Fetch kafkaFetch `config:"fetch"` - Rebalance kafkaRebalance `config:"rebalance"` - TLS *tlscommon.Config `config:"ssl"` - Username string `config:"username"` - Password string `config:"password"` + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + GroupID string `config:"group_id" validate:"required"` + ClientID string `config:"client_id"` + Version kafka.Version `config:"version"` + InitialOffset initialOffset `config:"initial_offset"` + ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` + ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` + WaitClose time.Duration `config:"wait_close" validate:"min=0"` + MaxWaitTime time.Duration `config:"max_wait_time"` + IsolationLevel isolationLevel `config:"isolation_level"` + Fetch kafkaFetch `config:"fetch"` + Rebalance kafkaRebalance `config:"rebalance"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` + YieldEventsFromField string `config:"yield_events_from_field"` } type kafkaFetch struct { diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 98b7f15c1f0..5a8c55222ff 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -19,6 +19,7 @@ package kafka import ( "context" + "encoding/json" "fmt" "strings" "sync" @@ -107,6 +108,8 @@ func (input *kafkaInput) runConsumerGroup( handler := &groupHandler{ version: input.config.Version, outlet: input.outlet, + // yieldEventsFromField will be assigned the configuration option yield_events_from_field + yieldEventsFromField: input.config.YieldEventsFromField, } input.saramaWaitGroup.Add(1) @@ -234,6 +237,9 @@ type groupHandler struct { version kafka.Version session sarama.ConsumerGroupSession outlet channel.Outleter + // if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned + // ex. in this case are the azure fielsets where the events are found under the json object "records" + yieldEventsFromField string } // The metadata attached to incoming events so they can be ACKed once they've @@ -243,11 +249,11 @@ type eventMeta struct { message *sarama.ConsumerMessage } -func (h *groupHandler) createEvent( +func (h *groupHandler) createEvents( sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, message *sarama.ConsumerMessage, -) beat.Event { +) []beat.Event { timestamp := time.Now() kafkaFields := common.MapStr{ "topic": claim.Topic(), @@ -266,19 +272,31 @@ func (h *groupHandler) createEvent( if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers) } - event := beat.Event{ - Timestamp: timestamp, - Fields: common.MapStr{ - "message": string(message.Value), - "kafka": kafkaFields, - }, - Private: eventMeta{ - handler: h, - message: message, - }, + + // if yieldEventsFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed + var events []beat.Event + var messages []string + if h.yieldEventsFromField == "" { + messages = []string{string(message.Value)} + } else { + messages = h.parseMultipleMessages(message.Value) } + for _, msg := range messages { + event := beat.Event{ + Timestamp: timestamp, + Fields: common.MapStr{ + "message": msg, + "kafka": kafkaFields, + }, + Private: eventMeta{ + handler: h, + message: message, + }, + } + events = append(events, event) - return event + } + return events } func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error { @@ -307,8 +325,29 @@ func (h *groupHandler) ack(message *sarama.ConsumerMessage) { func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - event := h.createEvent(sess, claim, msg) - h.outlet.OnEvent(event) + events := h.createEvents(sess, claim, msg) + for _, event := range events { + h.outlet.OnEvent(event) + } } return nil } + +// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration +func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string { + var messages []string + var obj map[string][]interface{} + err := json.Unmarshal(bMessage, &obj) + if err != nil { + return messages + } + if len(obj[h.yieldEventsFromField]) > 0 { + for _, ms := range obj[h.yieldEventsFromField] { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + } + } + } + return messages +} From 7d50f3f7cece0f1e2567246443bc8522337af167 Mon Sep 17 00:00:00 2001 From: Mariana Date: Tue, 8 Oct 2019 17:11:17 +0200 Subject: [PATCH 2/8] Add changelog event --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 06fa1bcf228..5b56988ff7e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -333,6 +333,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add input httpjson. {issue}13545[13545] {pull}13546[13546] - Filebeat Netflow input: Remove beta label. {pull}13858[13858] - Remove `event.timezone` from events that don't need it in some modules that support log formats with and without timezones. {pull}13918[13918] +- Add YieldEventsFromField config option in the kafka input. {pull}13965[13965] *Heartbeat* - Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498] From d951bc39620d11eb5f9e58b707fe41ba3772b8ce Mon Sep 17 00:00:00 2001 From: Mariana Date: Tue, 8 Oct 2019 19:39:47 +0200 Subject: [PATCH 3/8] Creating integration test --- .../input/kafka/kafka_integration_test.go | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 71e44cf6219..1c053d6b648 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -178,6 +178,79 @@ func TestInput(t *testing.T) { } } +func TestInputWithMultipleEvents(t *testing.T) { + id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) + testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) + context := input.Context{ + Done: make(chan struct{}), + BeatDone: make(chan struct{}), + } + + // Send test messages to the topic for the input to read. + message := testMessage{ + message: "{\"records\": [{\"val\": \"val1\"}, {\"val\": \"val2\"}]}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "yield_events_from_field": "records", + }) + + // Route input events through our capturer instead of sending through ES. + events := make(chan beat.Event, 100) + defer close(events) + capturer := NewEventCapturer(events) + defer capturer.Close() + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + return channel.SubOutlet(capturer), nil + }) + + input, err := NewInput(config, connector, context) + if err != nil { + t.Fatal(err) + } + + // Run the input and wait for finalization + input.Run() + + timeout := time.After(30 * time.Second) + select { + case event := <-events: + text, err := event.Fields.GetValue("message") + if err != nil { + t.Fatal(err) + } + msgs := []string{"{\"val\": \"val1\"}", "{\"val\": \"val2\"}"} + assert.Contains(t, text, msgs) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") + } + + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + close(context.Done) + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } +} + func checkMatchingHeaders( t *testing.T, event beat.Event, expected []sarama.RecordHeader, ) { From 8ed8fb6416ab7b51db71d8ecc1cb7cfd97786b1f Mon Sep 17 00:00:00 2001 From: Mariana Date: Tue, 8 Oct 2019 20:10:54 +0200 Subject: [PATCH 4/8] rename property --- filebeat/input/kafka/config.go | 34 +++++++++---------- filebeat/input/kafka/input.go | 22 +++++++----- .../input/kafka/kafka_integration_test.go | 16 ++++----- 3 files changed, 38 insertions(+), 34 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index e563248bddf..96e7ae02518 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -34,23 +34,23 @@ import ( type kafkaInputConfig struct { // Kafka hosts with port, e.g. "localhost:9092" - Hosts []string `config:"hosts" validate:"required"` - Topics []string `config:"topics" validate:"required"` - GroupID string `config:"group_id" validate:"required"` - ClientID string `config:"client_id"` - Version kafka.Version `config:"version"` - InitialOffset initialOffset `config:"initial_offset"` - ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` - ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` - WaitClose time.Duration `config:"wait_close" validate:"min=0"` - MaxWaitTime time.Duration `config:"max_wait_time"` - IsolationLevel isolationLevel `config:"isolation_level"` - Fetch kafkaFetch `config:"fetch"` - Rebalance kafkaRebalance `config:"rebalance"` - TLS *tlscommon.Config `config:"ssl"` - Username string `config:"username"` - Password string `config:"password"` - YieldEventsFromField string `config:"yield_events_from_field"` + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + GroupID string `config:"group_id" validate:"required"` + ClientID string `config:"client_id"` + Version kafka.Version `config:"version"` + InitialOffset initialOffset `config:"initial_offset"` + ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` + ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` + WaitClose time.Duration `config:"wait_close" validate:"min=0"` + MaxWaitTime time.Duration `config:"max_wait_time"` + IsolationLevel isolationLevel `config:"isolation_level"` + Fetch kafkaFetch `config:"fetch"` + Rebalance kafkaRebalance `config:"rebalance"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` + ExpandEventListFromField string `config:"expand_events_from_field"` } type kafkaFetch struct { diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 5a8c55222ff..66272a1b529 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -108,8 +108,9 @@ func (input *kafkaInput) runConsumerGroup( handler := &groupHandler{ version: input.config.Version, outlet: input.outlet, - // yieldEventsFromField will be assigned the configuration option yield_events_from_field - yieldEventsFromField: input.config.YieldEventsFromField, + // expandEventListFromField will be assigned the configuration option yield_events_from_field + expandEventListFromField: input.config.ExpandEventListFromField, + log: input.log, } input.saramaWaitGroup.Add(1) @@ -239,7 +240,8 @@ type groupHandler struct { outlet channel.Outleter // if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned // ex. in this case are the azure fielsets where the events are found under the json object "records" - yieldEventsFromField string + expandEventListFromField string + log *logp.Logger } // The metadata attached to incoming events so they can be ACKed once they've @@ -273,10 +275,10 @@ func (h *groupHandler) createEvents( kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers) } - // if yieldEventsFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed + // if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed var events []beat.Event var messages []string - if h.yieldEventsFromField == "" { + if h.expandEventListFromField == "" { messages = []string{string(message.Value)} } else { messages = h.parseMultipleMessages(message.Value) @@ -335,16 +337,18 @@ func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sara // parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string { - var messages []string var obj map[string][]interface{} err := json.Unmarshal(bMessage, &obj) if err != nil { - return messages + h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err) + return []string{} } - if len(obj[h.yieldEventsFromField]) > 0 { - for _, ms := range obj[h.yieldEventsFromField] { + var messages []string + if len(obj[h.expandEventListFromField]) > 0 { + for _, ms := range obj[h.expandEventListFromField] { js, err := json.Marshal(ms) if err == nil { + h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) messages = append(messages, string(js)) } } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 1c053d6b648..42680d9f202 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -188,7 +188,7 @@ func TestInputWithMultipleEvents(t *testing.T) { // Send test messages to the topic for the input to read. message := testMessage{ - message: "{\"records\": [{\"val\": \"val1\"}, {\"val\": \"val2\"}]}", + message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}", headers: []sarama.RecordHeader{ recordHeader("X-Test-Header", "test header value"), }, @@ -197,11 +197,11 @@ func TestInputWithMultipleEvents(t *testing.T) { // Setup the input config config := common.MustNewConfigFrom(common.MapStr{ - "hosts": getTestKafkaHost(), - "topics": []string{testTopic}, - "group_id": "filebeat", - "wait_close": 0, - "yield_events_from_field": "records", + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "expand_events_from_field": "records", }) // Route input events through our capturer instead of sending through ES. @@ -228,8 +228,8 @@ func TestInputWithMultipleEvents(t *testing.T) { if err != nil { t.Fatal(err) } - msgs := []string{"{\"val\": \"val1\"}", "{\"val\": \"val2\"}"} - assert.Contains(t, text, msgs) + msgs := []string{"{\"val\":\"val1\"}", "{\"val\":\"val2\"}"} + assert.Contains(t, msgs, text) checkMatchingHeaders(t, event, message.headers) case <-timeout: t.Fatal("timeout waiting for incoming events") From 3ecf81fcb7b9de629b460c4a224b0d75d7295cfb Mon Sep 17 00:00:00 2001 From: Mariana Date: Tue, 8 Oct 2019 20:13:52 +0200 Subject: [PATCH 5/8] Modify changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5b56988ff7e..081f6bacac4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -333,7 +333,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add input httpjson. {issue}13545[13545] {pull}13546[13546] - Filebeat Netflow input: Remove beta label. {pull}13858[13858] - Remove `event.timezone` from events that don't need it in some modules that support log formats with and without timezones. {pull}13918[13918] -- Add YieldEventsFromField config option in the kafka input. {pull}13965[13965] +- Add ExpandEventListFromField config option in the kafka input. {pull}13965[13965] *Heartbeat* - Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498] From 28ecc926f2a7efa49eecf46e30112e863a678474 Mon Sep 17 00:00:00 2001 From: Mariana Date: Tue, 8 Oct 2019 20:31:32 +0200 Subject: [PATCH 6/8] Rename config option --- filebeat/docs/inputs/input-kafka.asciidoc | 11 +++++++++++ filebeat/input/kafka/config.go | 2 +- filebeat/input/kafka/input.go | 2 +- filebeat/input/kafka/kafka_integration_test.go | 2 +- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index c40c2c69535..f96ff330cf9 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -127,6 +127,17 @@ Kafka fetch settings: *`max`*:: The maximum number of bytes to read per request. Defaults to 0 (no limit). +===== `expand_event_list_from_field` + +If the fileset using this input expects to receive multiple messages bundled under a specific field then the config option `expand_event_list_from_field` value can be assigned the name of the json group object. +For example in the case of azure fielsets the events are found under the json object "records". +``` +{ +"records": [ {event1}, {event2}] +} +``` +This setting will be able to split the messages under the group value ('records') into separate events. + ===== `rebalance` Kafka rebalance settings: diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 96e7ae02518..6fb14730aea 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -50,7 +50,7 @@ type kafkaInputConfig struct { TLS *tlscommon.Config `config:"ssl"` Username string `config:"username"` Password string `config:"password"` - ExpandEventListFromField string `config:"expand_events_from_field"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` } type kafkaFetch struct { diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 66272a1b529..6feb5cfd742 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -108,7 +108,7 @@ func (input *kafkaInput) runConsumerGroup( handler := &groupHandler{ version: input.config.Version, outlet: input.outlet, - // expandEventListFromField will be assigned the configuration option yield_events_from_field + // expandEventListFromField will be assigned the configuration option expand_event_list_from_field expandEventListFromField: input.config.ExpandEventListFromField, log: input.log, } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 42680d9f202..959f270b580 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -201,7 +201,7 @@ func TestInputWithMultipleEvents(t *testing.T) { "topics": []string{testTopic}, "group_id": "filebeat", "wait_close": 0, - "expand_events_from_field": "records", + "expand_event_list_from_field": "records", }) // Route input events through our capturer instead of sending through ES. From 73d80e3911e3e8a07c43d351d13934c10468774f Mon Sep 17 00:00:00 2001 From: Mariana Date: Tue, 8 Oct 2019 20:55:55 +0200 Subject: [PATCH 7/8] run fmt update --- filebeat/input/kafka/kafka_integration_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 959f270b580..02d7adcc075 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -197,10 +197,10 @@ func TestInputWithMultipleEvents(t *testing.T) { // Setup the input config config := common.MustNewConfigFrom(common.MapStr{ - "hosts": getTestKafkaHost(), - "topics": []string{testTopic}, - "group_id": "filebeat", - "wait_close": 0, + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, "expand_event_list_from_field": "records", }) From 0bdc6c76acc3389da0add3a3a5160db23196fab8 Mon Sep 17 00:00:00 2001 From: Mariana Date: Wed, 9 Oct 2019 08:55:53 +0200 Subject: [PATCH 8/8] Spelling fix --- filebeat/docs/inputs/input-kafka.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index f96ff330cf9..2c265fd974e 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -129,8 +129,8 @@ Kafka fetch settings: ===== `expand_event_list_from_field` -If the fileset using this input expects to receive multiple messages bundled under a specific field then the config option `expand_event_list_from_field` value can be assigned the name of the json group object. -For example in the case of azure fielsets the events are found under the json object "records". +If the fileset using this input expects to receive multiple messages bundled under a specific field then the config option `expand_event_list_from_field` value can be assigned the name of the field. +For example in the case of azure filesets the events are found under the json object "records". ``` { "records": [ {event1}, {event2}]