Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify kafka input to include parsing multiple messages #13965

Merged
merged 8 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add input httpjson. {issue}13545[13545] {pull}13546[13546]
- Filebeat Netflow input: Remove beta label. {pull}13858[13858]
- Remove `event.timezone` from events that don't need it in some modules that support log formats with and without timezones. {pull}13918[13918]
- Add ExpandEventListFromField config option in the kafka input. {pull}13965[13965]

*Heartbeat*
- Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498]
Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ Kafka fetch settings:
*`max`*:: The maximum number of bytes to read per request. Defaults to 0
(no limit).

===== `expand_event_list_from_field`

If the fileset using this input expects to receive multiple messages bundled under a specific field then the config option `expand_event_list_from_field` value can be assigned the name of the json group object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the name of the json group object" -> "the name of the field" is I think clearer about what value should go in the config

"fielsets" -> "filesets" in the following line

For example in the case of azure fielsets the events are found under the json object "records".
```
{
"records": [ {event1}, {event2}]
}
```
This setting will be able to split the messages under the group value ('records') into separate events.

===== `rebalance`

Kafka rebalance settings:
Expand Down
33 changes: 17 additions & 16 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,23 @@ import (

type kafkaInputConfig struct {
// Kafka hosts with port, e.g. "localhost:9092"
Hosts []string `config:"hosts" validate:"required"`
Topics []string `config:"topics" validate:"required"`
GroupID string `config:"group_id" validate:"required"`
ClientID string `config:"client_id"`
Version kafka.Version `config:"version"`
InitialOffset initialOffset `config:"initial_offset"`
ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"`
ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"`
WaitClose time.Duration `config:"wait_close" validate:"min=0"`
MaxWaitTime time.Duration `config:"max_wait_time"`
IsolationLevel isolationLevel `config:"isolation_level"`
Fetch kafkaFetch `config:"fetch"`
Rebalance kafkaRebalance `config:"rebalance"`
TLS *tlscommon.Config `config:"ssl"`
Username string `config:"username"`
Password string `config:"password"`
Hosts []string `config:"hosts" validate:"required"`
Topics []string `config:"topics" validate:"required"`
GroupID string `config:"group_id" validate:"required"`
ClientID string `config:"client_id"`
Version kafka.Version `config:"version"`
InitialOffset initialOffset `config:"initial_offset"`
ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"`
ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"`
WaitClose time.Duration `config:"wait_close" validate:"min=0"`
MaxWaitTime time.Duration `config:"max_wait_time"`
IsolationLevel isolationLevel `config:"isolation_level"`
Fetch kafkaFetch `config:"fetch"`
Rebalance kafkaRebalance `config:"rebalance"`
TLS *tlscommon.Config `config:"ssl"`
Username string `config:"username"`
Password string `config:"password"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

type kafkaFetch struct {
Expand Down
73 changes: 58 additions & 15 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -107,6 +108,9 @@ func (input *kafkaInput) runConsumerGroup(
handler := &groupHandler{
version: input.config.Version,
outlet: input.outlet,
// expandEventListFromField will be assigned the configuration option expand_event_list_from_field
expandEventListFromField: input.config.ExpandEventListFromField,
log: input.log,
}

input.saramaWaitGroup.Add(1)
Expand Down Expand Up @@ -234,6 +238,10 @@ type groupHandler struct {
version kafka.Version
session sarama.ConsumerGroupSession
outlet channel.Outleter
// if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned
// ex. in this case are the azure fielsets where the events are found under the json object "records"
expandEventListFromField string
log *logp.Logger
}

// The metadata attached to incoming events so they can be ACKed once they've
Expand All @@ -243,11 +251,11 @@ type eventMeta struct {
message *sarama.ConsumerMessage
}

func (h *groupHandler) createEvent(
func (h *groupHandler) createEvents(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
message *sarama.ConsumerMessage,
) beat.Event {
) []beat.Event {
timestamp := time.Now()
kafkaFields := common.MapStr{
"topic": claim.Topic(),
Expand All @@ -266,19 +274,31 @@ func (h *groupHandler) createEvent(
if versionOk && version.IsAtLeast(sarama.V0_11_0_0) {
kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers)
}
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": string(message.Value),
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},

// if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed
var events []beat.Event
var messages []string
if h.expandEventListFromField == "" {
messages = []string{string(message.Value)}
} else {
messages = h.parseMultipleMessages(message.Value)
}
for _, msg := range messages {
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": msg,
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)

return event
}
return events
}

func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error {
Expand Down Expand Up @@ -307,8 +327,31 @@ func (h *groupHandler) ack(message *sarama.ConsumerMessage) {

func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
event := h.createEvent(sess, claim, msg)
h.outlet.OnEvent(event)
events := h.createEvents(sess, claim, msg)
for _, event := range events {
h.outlet.OnEvent(event)
}
}
return nil
}

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string {
narph marked this conversation as resolved.
Show resolved Hide resolved
var obj map[string][]interface{}
err := json.Unmarshal(bMessage, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err)
return []string{}
}
var messages []string
if len(obj[h.expandEventListFromField]) > 0 {
for _, ms := range obj[h.expandEventListFromField] {
js, err := json.Marshal(ms)
if err == nil {
h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err)
messages = append(messages, string(js))
}
}
}
return messages
}
73 changes: 73 additions & 0 deletions filebeat/input/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,79 @@ func TestInput(t *testing.T) {
}
}

func TestInputWithMultipleEvents(t *testing.T) {
id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int())
testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id)
context := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}

// Send test messages to the topic for the input to read.
message := testMessage{
message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}",
headers: []sarama.RecordHeader{
recordHeader("X-Test-Header", "test header value"),
},
}
writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20)

// Setup the input config
config := common.MustNewConfigFrom(common.MapStr{
"hosts": getTestKafkaHost(),
"topics": []string{testTopic},
"group_id": "filebeat",
"wait_close": 0,
"expand_event_list_from_field": "records",
})

// Route input events through our capturer instead of sending through ES.
events := make(chan beat.Event, 100)
defer close(events)
capturer := NewEventCapturer(events)
defer capturer.Close()
connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(capturer), nil
})

input, err := NewInput(config, connector, context)
if err != nil {
t.Fatal(err)
}

// Run the input and wait for finalization
input.Run()

timeout := time.After(30 * time.Second)
select {
case event := <-events:
text, err := event.Fields.GetValue("message")
if err != nil {
t.Fatal(err)
}
msgs := []string{"{\"val\":\"val1\"}", "{\"val\":\"val2\"}"}
assert.Contains(t, msgs, text)
checkMatchingHeaders(t, event, message.headers)
case <-timeout:
t.Fatal("timeout waiting for incoming events")
}

// Close the done channel and make sure the beat shuts down in a reasonable
// amount of time.
close(context.Done)
didClose := make(chan struct{})
go func() {
input.Wait()
close(didClose)
}()

select {
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for beat to shut down")
case <-didClose:
}
}

func checkMatchingHeaders(
t *testing.T, event beat.Event, expected []sarama.RecordHeader,
) {
Expand Down