diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 15bff0c29153..73a699bf29a3 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -130,6 +130,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Add Beats Central Management {pull}8559[8559] - Allow Bus to buffer events in case listeners are not configured. {pull}8527[8527] - Enable `host` and `cloud` metadata processors by default. {pull}8596[8596] +- Dissect will now flag event on parsing error. {pull}8751[8751] *Auditbeat* diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 8b25349eb1eb..1191dff23d8d 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -24,6 +24,9 @@ import ( "github.com/elastic/beats/libbeat/common" ) +// FlagField fields used to keep information or errors when events are parsed. +const FlagField = "log.flags" + // Event is the common event format shared by all beats. // Every event must have a timestamp and provide encodable Fields in `Fields`. // The `Meta`-fields can be used to pass additional meta-data to the outputs. diff --git a/libbeat/processors/dissect/processor.go b/libbeat/processors/dissect/processor.go index 613ea38dc104..eb6f29240b2c 100644 --- a/libbeat/processors/dissect/processor.go +++ b/libbeat/processors/dissect/processor.go @@ -27,6 +27,8 @@ import ( "github.com/elastic/beats/libbeat/processors" ) +const flagParsingError = "dissect_parsing_error" + type processor struct { config config } @@ -60,6 +62,14 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { m, err := p.config.Tokenizer.Dissect(s) if err != nil { + if err := common.AddTagsWithKey( + event.Fields, + beat.FlagField, + []string{flagParsingError}, + ); err != nil { + return event, errors.Wrap(err, "cannot add new flag the event") + } + return event, err } diff --git a/libbeat/processors/dissect/processor_test.go b/libbeat/processors/dissect/processor_test.go index 132dae078729..7f7fbbb1b44d 100644 --- a/libbeat/processors/dissect/processor_test.go +++ b/libbeat/processors/dissect/processor_test.go @@ -176,3 +176,59 @@ func TestFieldAlreadyExist(t *testing.T) { }) } } + +func TestErrorFlagging(t *testing.T) { + t.Run("when the parsing fails add a flag", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%{ok} - %{notvalid}", + }) + + if !assert.NoError(t, err) { + return + } + + processor, err := newProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: common.MapStr{"message": "hello world"}} + event, err := processor.Run(&e) + + if !assert.Error(t, err) { + return + } + + flags, err := event.GetValue(beat.FlagField) + if !assert.NoError(t, err) { + return + } + + assert.Contains(t, flags, flagParsingError) + }) + + t.Run("when the parsing is succesful do not add a flag", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%{ok} %{valid}", + }) + + if !assert.NoError(t, err) { + return + } + + processor, err := newProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: common.MapStr{"message": "hello world"}} + event, err := processor.Run(&e) + + if !assert.NoError(t, err) { + return + } + + _, err = event.GetValue(beat.FlagField) + assert.Error(t, err) + }) +}