Skip to content

Commit

Permalink
Add tag "truncated" to "log.flags" if incoming line is longer than co…
Browse files Browse the repository at this point in the history
…nfigured limit (elastic#7991)

A new field is added to store the flags of an event named "log.flags".
If a message is truncated, "truncated" flag is added to the list.

Example event with "truncated" flag:

{
  "@timestamp": "2018-08-16T13:00:46.759Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "doc",
    "version": "7.0.0-alpha1"
  },
  "host": {
    "name": "sleipnir"
  },
  "source": "/home/n/test.log",
  "offset": 33,
  "log": {
    "flags": [
       "truncated"
    ],
  },
  "message": "test line",
  "prospector": {
    "type": "log"
  },
  "input": {
    "type": "log"
  },
  "beat": {
    "hostname": "sleipnir",
    "version": "7.0.0-alpha1",
    "name": "sleipnir"
  }
}

Closes elastic#7022
  • Loading branch information
kvch authored Aug 30, 2018
1 parent 30ac8e8 commit 0884236
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ The list below covers the major changes between 6.3.0 and master only.
coverage profile (.cov), and produces an HTML coverage report. See
`mage -h goTestUnit`. {pull}7766[7766]
- Beats packaging now build non-oss binaries from code located in the x-pack folder. {issue}7783[7783]
- New function `AddTagsWithKey` is added, so `common.MapStr` can be enriched with tags with an arbitrary key. {pull}7991[7991]
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Add custom unpack to log hints config to avoid env resolution {pull}7710[7710]
- Keep raw user agent information after parsing as user_agent_raw in Filebeat modules. {pull}7823[7832]
- Make docker input check if container strings are empty {pull}7960[7960]
- Add tag "truncated" to "log.flags" if incoming line is longer than configured limit. {pull}7991[7991]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
description: >
Logging level.
- name: log.flags
description: >
This field contains the flags of the event.
- name: event.created
type: date
description: >
Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,14 @@ type: keyword
Logging level.
--
*`log.flags`*::
+
--
This field contains the flags of the event.
--
*`event.created`*::
Expand Down
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions filebeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,28 @@ func (m *Message) IsEmpty() bool {
return false
}

func (msg *Message) AddFields(fields common.MapStr) {
// AddFields adds fields to the message.
func (m *Message) AddFields(fields common.MapStr) {
if fields == nil {
return
}

if msg.Fields == nil {
msg.Fields = common.MapStr{}
if m.Fields == nil {
m.Fields = common.MapStr{}
}
msg.Fields.Update(fields)
m.Fields.Update(fields)
}

// AddFlagsWithKey adds flags to the message with an arbitrary key.
// If the field does not exist, it is created.
func (m *Message) AddFlagsWithKey(key string, flags ...string) error {
if len(flags) == 0 {
return nil
}

if m.Fields == nil {
m.Fields = common.MapStr{}
}

return common.AddTagsWithKey(m.Fields, key, flags)
}
17 changes: 17 additions & 0 deletions filebeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Reader struct {
separator []byte
last []byte
numLines int
truncated int
err error // last seen error
state func(*Reader) (reader.Message, error)
message reader.Message
Expand Down Expand Up @@ -262,13 +263,19 @@ func (mlr *Reader) clear() {
mlr.message = reader.Message{}
mlr.last = nil
mlr.numLines = 0
mlr.truncated = 0
mlr.err = nil
}

// finalize writes the existing content into the returned message and resets all reader variables.
func (mlr *Reader) finalize() reader.Message {
if mlr.truncated > 0 {
mlr.message.AddFlagsWithKey("log.flags", "truncated")
}

// Copy message from existing content
msg := mlr.message

mlr.clear()
return msg
}
Expand Down Expand Up @@ -303,6 +310,16 @@ func (mlr *Reader) addLine(m reader.Message) {
}
mlr.message.Content = append(tmp, m.Content[:space]...)
mlr.numLines++

// add number of truncated bytes to fields
diff := len(m.Content) - space
if diff > 0 {
mlr.truncated += diff
}
} else {
// increase the number of skipped bytes, if cannot add
mlr.truncated += len(m.Content)

}

mlr.last = m.Content
Expand Down
83 changes: 83 additions & 0 deletions filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,41 @@ func TestMultilineBeforeNegateOKWithEmptyLine(t *testing.T) {
)
}

func TestMultilineAfterTruncated(t *testing.T) {
pattern := match.MustCompile(`^[ ]`) // next line is indented a space
maxLines := 2
testMultilineTruncated(t,
Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
true,
[]string{
"line1\n line1.1\n line1.2\n",
"line2\n line2.1\n line2.2\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
testMultilineTruncated(t,
Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
false,
[]string{
"line1\n line1.1\n",
"line2\n line2.1\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
}

func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
_, buf := createLineBuffer(expected...)
r := createMultilineTestReader(t, buf, cfg)
Expand Down Expand Up @@ -177,6 +212,54 @@ func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
}
}

func testMultilineTruncated(t *testing.T, cfg Config, events int, truncated bool, input, expected []string) {
_, buf := createLineBuffer(input...)
r := createMultilineTestReader(t, buf, cfg)

var messages []reader.Message
for {
message, err := r.Next()
if err != nil {
break
}

messages = append(messages, message)
}

if len(messages) != events {
t.Fatalf("expected %v lines, read only %v line(s)", len(expected), len(messages))
}

for _, message := range messages {
found := false
statusFlags, err := message.Fields.GetValue("log.flags")
if err != nil {
if !truncated {
assert.False(t, found)
return
}
t.Fatalf("error while getting log.status field: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}

func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reader.Reader {
encFactory, ok := encoding.FindEncoding("plain")
if !ok {
Expand Down
1 change: 1 addition & 0 deletions filebeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
message.AddFlagsWithKey("log.flags", "truncated")
}
return message, err
}
88 changes: 88 additions & 0 deletions filebeat/reader/readfile/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package readfile

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
)

type mockReader struct {
line []byte
}

func (m *mockReader) Next() (reader.Message, error) {
return reader.Message{
Content: m.line,
}, nil
}

var limitTests = []struct {
line string
maxBytes int
truncated bool
}{
{"long-long-line", 5, true},
{"long-long-line", 3, true},
{"long-long-line", len("long-long-line"), false},
}

func TestLimitReader(t *testing.T) {
for _, test := range limitTests {
r := NewLimitReader(&mockReader{[]byte(test.line)}, test.maxBytes)

msg, err := r.Next()
if err != nil {
t.Fatalf("Error reading from mock reader: %v", err)
}

assert.Equal(t, test.maxBytes, len(msg.Content))

found := false
statusFlags, err := msg.Fields.GetValue("log.flags")
if err != nil {
if !test.truncated {
assert.False(t, found)
return
}
t.Fatalf("Error getting truncated value: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if test.truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}
27 changes: 20 additions & 7 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,25 +305,38 @@ func MergeFields(ms, fields MapStr, underRoot bool) error {
// exist then it will be created. If the tags field exists and is not a []string
// then an error will be returned. It does not deduplicate the list of tags.
func AddTags(ms MapStr, tags []string) error {
return AddTagsWithKey(ms, TagsKey, tags)
}

// AddTagsWithKey appends a tag to the key field of ms. If the field does not
// exist then it will be created. If the field exists and is not a []string
// then an error will be returned. It does not deduplicate the list.
func AddTagsWithKey(ms MapStr, key string, tags []string) error {
if ms == nil || len(tags) == 0 {
return nil
}
eventTags, exists := ms[TagsKey]
if !exists {
ms[TagsKey] = tags

k, subMap, oldTags, present, err := mapFind(key, ms, true)
if err != nil {
return err
}

if !present {
subMap[k] = tags
return nil
}

switch arr := eventTags.(type) {
switch arr := oldTags.(type) {
case []string:
ms[TagsKey] = append(arr, tags...)
subMap[k] = append(arr, tags...)
case []interface{}:
for _, tag := range tags {
arr = append(arr, tag)
}
ms[TagsKey] = arr
subMap[k] = arr
default:
return errors.Errorf("expected string array by type is %T", eventTags)
return errors.Errorf("expected string array by type is %T", oldTags)

}
return nil
}
Expand Down
Loading

0 comments on commit 0884236

Please sign in to comment.