diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 06fa1bcf228..081f6bacac4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -333,6 +333,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add input httpjson. {issue}13545[13545] {pull}13546[13546] - Filebeat Netflow input: Remove beta label. {pull}13858[13858] - Remove `event.timezone` from events that don't need it in some modules that support log formats with and without timezones. {pull}13918[13918] +- Add ExpandEventListFromField config option in the kafka input. {pull}13965[13965] *Heartbeat* - Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498] diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index c40c2c69535..2c265fd974e 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -127,6 +127,17 @@ Kafka fetch settings: *`max`*:: The maximum number of bytes to read per request. Defaults to 0 (no limit). +===== `expand_event_list_from_field` + +If the fileset using this input expects to receive multiple messages bundled under a specific field then the config option `expand_event_list_from_field` value can be assigned the name of the field. +For example in the case of azure filesets the events are found under the json object "records". +``` +{ +"records": [ {event1}, {event2}] +} +``` +This setting will be able to split the messages under the group value ('records') into separate events. + ===== `rebalance` Kafka rebalance settings: diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index ddc505bf4c7..6fb14730aea 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -34,22 +34,23 @@ import ( type kafkaInputConfig struct { // Kafka hosts with port, e.g. "localhost:9092" - Hosts []string `config:"hosts" validate:"required"` - Topics []string `config:"topics" validate:"required"` - GroupID string `config:"group_id" validate:"required"` - ClientID string `config:"client_id"` - Version kafka.Version `config:"version"` - InitialOffset initialOffset `config:"initial_offset"` - ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` - ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` - WaitClose time.Duration `config:"wait_close" validate:"min=0"` - MaxWaitTime time.Duration `config:"max_wait_time"` - IsolationLevel isolationLevel `config:"isolation_level"` - Fetch kafkaFetch `config:"fetch"` - Rebalance kafkaRebalance `config:"rebalance"` - TLS *tlscommon.Config `config:"ssl"` - Username string `config:"username"` - Password string `config:"password"` + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + GroupID string `config:"group_id" validate:"required"` + ClientID string `config:"client_id"` + Version kafka.Version `config:"version"` + InitialOffset initialOffset `config:"initial_offset"` + ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` + ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` + WaitClose time.Duration `config:"wait_close" validate:"min=0"` + MaxWaitTime time.Duration `config:"max_wait_time"` + IsolationLevel isolationLevel `config:"isolation_level"` + Fetch kafkaFetch `config:"fetch"` + Rebalance kafkaRebalance `config:"rebalance"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` } type kafkaFetch struct { diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 98b7f15c1f0..6feb5cfd742 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -19,6 +19,7 @@ package kafka import ( "context" + "encoding/json" "fmt" "strings" "sync" @@ -107,6 +108,9 @@ func (input *kafkaInput) runConsumerGroup( handler := &groupHandler{ version: input.config.Version, outlet: input.outlet, + // expandEventListFromField will be assigned the configuration option expand_event_list_from_field + expandEventListFromField: input.config.ExpandEventListFromField, + log: input.log, } input.saramaWaitGroup.Add(1) @@ -234,6 +238,10 @@ type groupHandler struct { version kafka.Version session sarama.ConsumerGroupSession outlet channel.Outleter + // if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned + // ex. in this case are the azure fielsets where the events are found under the json object "records" + expandEventListFromField string + log *logp.Logger } // The metadata attached to incoming events so they can be ACKed once they've @@ -243,11 +251,11 @@ type eventMeta struct { message *sarama.ConsumerMessage } -func (h *groupHandler) createEvent( +func (h *groupHandler) createEvents( sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, message *sarama.ConsumerMessage, -) beat.Event { +) []beat.Event { timestamp := time.Now() kafkaFields := common.MapStr{ "topic": claim.Topic(), @@ -266,19 +274,31 @@ func (h *groupHandler) createEvent( if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers) } - event := beat.Event{ - Timestamp: timestamp, - Fields: common.MapStr{ - "message": string(message.Value), - "kafka": kafkaFields, - }, - Private: eventMeta{ - handler: h, - message: message, - }, + + // if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed + var events []beat.Event + var messages []string + if h.expandEventListFromField == "" { + messages = []string{string(message.Value)} + } else { + messages = h.parseMultipleMessages(message.Value) } + for _, msg := range messages { + event := beat.Event{ + Timestamp: timestamp, + Fields: common.MapStr{ + "message": msg, + "kafka": kafkaFields, + }, + Private: eventMeta{ + handler: h, + message: message, + }, + } + events = append(events, event) - return event + } + return events } func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error { @@ -307,8 +327,31 @@ func (h *groupHandler) ack(message *sarama.ConsumerMessage) { func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - event := h.createEvent(sess, claim, msg) - h.outlet.OnEvent(event) + events := h.createEvents(sess, claim, msg) + for _, event := range events { + h.outlet.OnEvent(event) + } } return nil } + +// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration +func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string { + var obj map[string][]interface{} + err := json.Unmarshal(bMessage, &obj) + if err != nil { + h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err) + return []string{} + } + var messages []string + if len(obj[h.expandEventListFromField]) > 0 { + for _, ms := range obj[h.expandEventListFromField] { + js, err := json.Marshal(ms) + if err == nil { + h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) + messages = append(messages, string(js)) + } + } + } + return messages +} diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 71e44cf6219..02d7adcc075 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -178,6 +178,79 @@ func TestInput(t *testing.T) { } } +func TestInputWithMultipleEvents(t *testing.T) { + id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) + testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) + context := input.Context{ + Done: make(chan struct{}), + BeatDone: make(chan struct{}), + } + + // Send test messages to the topic for the input to read. + message := testMessage{ + message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "expand_event_list_from_field": "records", + }) + + // Route input events through our capturer instead of sending through ES. + events := make(chan beat.Event, 100) + defer close(events) + capturer := NewEventCapturer(events) + defer capturer.Close() + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + return channel.SubOutlet(capturer), nil + }) + + input, err := NewInput(config, connector, context) + if err != nil { + t.Fatal(err) + } + + // Run the input and wait for finalization + input.Run() + + timeout := time.After(30 * time.Second) + select { + case event := <-events: + text, err := event.Fields.GetValue("message") + if err != nil { + t.Fatal(err) + } + msgs := []string{"{\"val\":\"val1\"}", "{\"val\":\"val2\"}"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") + } + + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + close(context.Done) + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } +} + func checkMatchingHeaders( t *testing.T, event beat.Event, expected []sarama.RecordHeader, ) {