Skip to content

Commit

Permalink
Modify kafka input to include parsing multiple messages (#13965)
Browse files Browse the repository at this point in the history
* Modify kafka input to include parsing multiple messages

* Add changelog event

* Creating integration test

* rename property

* Modify changelog

* Rename config option

* run fmt update

* Spelling fix
  • Loading branch information
narph authored Oct 9, 2019
1 parent 9098de2 commit 065f291
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add input httpjson. {issue}13545[13545] {pull}13546[13546]
- Filebeat Netflow input: Remove beta label. {pull}13858[13858]
- Remove `event.timezone` from events that don't need it in some modules that support log formats with and without timezones. {pull}13918[13918]
- Add ExpandEventListFromField config option in the kafka input. {pull}13965[13965]

*Heartbeat*
- Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498]
Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ Kafka fetch settings:
*`max`*:: The maximum number of bytes to read per request. Defaults to 0
(no limit).

===== `expand_event_list_from_field`

If the fileset using this input expects to receive multiple messages bundled under a specific field then the config option `expand_event_list_from_field` value can be assigned the name of the field.
For example in the case of azure filesets the events are found under the json object "records".
```
{
"records": [ {event1}, {event2}]
}
```
This setting will be able to split the messages under the group value ('records') into separate events.

===== `rebalance`

Kafka rebalance settings:
Expand Down
33 changes: 17 additions & 16 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,23 @@ import (

type kafkaInputConfig struct {
// Kafka hosts with port, e.g. "localhost:9092"
Hosts []string `config:"hosts" validate:"required"`
Topics []string `config:"topics" validate:"required"`
GroupID string `config:"group_id" validate:"required"`
ClientID string `config:"client_id"`
Version kafka.Version `config:"version"`
InitialOffset initialOffset `config:"initial_offset"`
ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"`
ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"`
WaitClose time.Duration `config:"wait_close" validate:"min=0"`
MaxWaitTime time.Duration `config:"max_wait_time"`
IsolationLevel isolationLevel `config:"isolation_level"`
Fetch kafkaFetch `config:"fetch"`
Rebalance kafkaRebalance `config:"rebalance"`
TLS *tlscommon.Config `config:"ssl"`
Username string `config:"username"`
Password string `config:"password"`
Hosts []string `config:"hosts" validate:"required"`
Topics []string `config:"topics" validate:"required"`
GroupID string `config:"group_id" validate:"required"`
ClientID string `config:"client_id"`
Version kafka.Version `config:"version"`
InitialOffset initialOffset `config:"initial_offset"`
ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"`
ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"`
WaitClose time.Duration `config:"wait_close" validate:"min=0"`
MaxWaitTime time.Duration `config:"max_wait_time"`
IsolationLevel isolationLevel `config:"isolation_level"`
Fetch kafkaFetch `config:"fetch"`
Rebalance kafkaRebalance `config:"rebalance"`
TLS *tlscommon.Config `config:"ssl"`
Username string `config:"username"`
Password string `config:"password"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

type kafkaFetch struct {
Expand Down
73 changes: 58 additions & 15 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -107,6 +108,9 @@ func (input *kafkaInput) runConsumerGroup(
handler := &groupHandler{
version: input.config.Version,
outlet: input.outlet,
// expandEventListFromField will be assigned the configuration option expand_event_list_from_field
expandEventListFromField: input.config.ExpandEventListFromField,
log: input.log,
}

input.saramaWaitGroup.Add(1)
Expand Down Expand Up @@ -234,6 +238,10 @@ type groupHandler struct {
version kafka.Version
session sarama.ConsumerGroupSession
outlet channel.Outleter
// if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned
// ex. in this case are the azure fielsets where the events are found under the json object "records"
expandEventListFromField string
log *logp.Logger
}

// The metadata attached to incoming events so they can be ACKed once they've
Expand All @@ -243,11 +251,11 @@ type eventMeta struct {
message *sarama.ConsumerMessage
}

func (h *groupHandler) createEvent(
func (h *groupHandler) createEvents(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
message *sarama.ConsumerMessage,
) beat.Event {
) []beat.Event {
timestamp := time.Now()
kafkaFields := common.MapStr{
"topic": claim.Topic(),
Expand All @@ -266,19 +274,31 @@ func (h *groupHandler) createEvent(
if versionOk && version.IsAtLeast(sarama.V0_11_0_0) {
kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers)
}
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": string(message.Value),
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},

// if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed
var events []beat.Event
var messages []string
if h.expandEventListFromField == "" {
messages = []string{string(message.Value)}
} else {
messages = h.parseMultipleMessages(message.Value)
}
for _, msg := range messages {
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": msg,
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)

return event
}
return events
}

func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error {
Expand Down Expand Up @@ -307,8 +327,31 @@ func (h *groupHandler) ack(message *sarama.ConsumerMessage) {

func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
event := h.createEvent(sess, claim, msg)
h.outlet.OnEvent(event)
events := h.createEvents(sess, claim, msg)
for _, event := range events {
h.outlet.OnEvent(event)
}
}
return nil
}

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string {
var obj map[string][]interface{}
err := json.Unmarshal(bMessage, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err)
return []string{}
}
var messages []string
if len(obj[h.expandEventListFromField]) > 0 {
for _, ms := range obj[h.expandEventListFromField] {
js, err := json.Marshal(ms)
if err == nil {
h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err)
messages = append(messages, string(js))
}
}
}
return messages
}
73 changes: 73 additions & 0 deletions filebeat/input/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,79 @@ func TestInput(t *testing.T) {
}
}

func TestInputWithMultipleEvents(t *testing.T) {
id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int())
testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id)
context := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}

// Send test messages to the topic for the input to read.
message := testMessage{
message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}",
headers: []sarama.RecordHeader{
recordHeader("X-Test-Header", "test header value"),
},
}
writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20)

// Setup the input config
config := common.MustNewConfigFrom(common.MapStr{
"hosts": getTestKafkaHost(),
"topics": []string{testTopic},
"group_id": "filebeat",
"wait_close": 0,
"expand_event_list_from_field": "records",
})

// Route input events through our capturer instead of sending through ES.
events := make(chan beat.Event, 100)
defer close(events)
capturer := NewEventCapturer(events)
defer capturer.Close()
connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(capturer), nil
})

input, err := NewInput(config, connector, context)
if err != nil {
t.Fatal(err)
}

// Run the input and wait for finalization
input.Run()

timeout := time.After(30 * time.Second)
select {
case event := <-events:
text, err := event.Fields.GetValue("message")
if err != nil {
t.Fatal(err)
}
msgs := []string{"{\"val\":\"val1\"}", "{\"val\":\"val2\"}"}
assert.Contains(t, msgs, text)
checkMatchingHeaders(t, event, message.headers)
case <-timeout:
t.Fatal("timeout waiting for incoming events")
}

// Close the done channel and make sure the beat shuts down in a reasonable
// amount of time.
close(context.Done)
didClose := make(chan struct{})
go func() {
input.Wait()
close(didClose)
}()

select {
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for beat to shut down")
case <-didClose:
}
}

func checkMatchingHeaders(
t *testing.T, event beat.Event, expected []sarama.RecordHeader,
) {
Expand Down

0 comments on commit 065f291

Please sign in to comment.