diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e09fabe2693..aadf24c6a95 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -320,6 +320,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] - Add support for multiple sets of hints on autodiscover {pull}18883[18883] - Add a configurable delay between retries when an app metadata cannot be retrieved by `add_cloudfoundry_metadata`. {pull}19181[19181] +- Add the `ignore_failure` configuration option to the dissect processor. {pull}19464[19464] +- Add the `overwrite_keys` configuration option to the dissect processor. {pull}19464[19464] +- Add support to trim captured values in the dissect processor. {pull}19464[19464] *Auditbeat* diff --git a/libbeat/processors/dissect/config.go b/libbeat/processors/dissect/config.go index d9541c2127e..4f6aa8e7434 100644 --- a/libbeat/processors/dissect/config.go +++ b/libbeat/processors/dissect/config.go @@ -17,15 +17,35 @@ package dissect +import ( + "strings" + + "github.com/pkg/errors" +) + +type trimMode byte + +const ( + trimModeNone trimMode = iota + trimModeRight + trimModeLeft + trimModeAll = trimModeRight | trimModeLeft +) + type config struct { - Tokenizer *tokenizer `config:"tokenizer" validate:"required"` - Field string `config:"field"` - TargetPrefix string `config:"target_prefix"` + Tokenizer *tokenizer `config:"tokenizer" validate:"required"` + Field string `config:"field"` + TargetPrefix string `config:"target_prefix"` + IgnoreFailure bool `config:"ignore_failure"` + OverwriteKeys bool `config:"overwrite_keys"` + TrimValues trimMode `config:"trim_values"` + TrimChars string `config:"trim_chars"` } var defaultConfig = config{ Field: "message", TargetPrefix: "dissect", + TrimChars: " ", } // tokenizer add validation at the unpack level for this specific field. @@ -40,3 +60,20 @@ func (t *tokenizer) Unpack(v string) error { *t = *d return nil } + +// Unpack the trim mode from a string. +func (tm *trimMode) Unpack(v string) error { + switch strings.ToLower(v) { + case "", "none": + *tm = trimModeNone + case "left": + *tm = trimModeLeft + case "right": + *tm = trimModeRight + case "all", "both": + *tm = trimModeAll + default: + return errors.Errorf("unsupported value %s. Must be one of [none, left, right, all]", v) + } + return nil +} diff --git a/libbeat/processors/dissect/config_test.go b/libbeat/processors/dissect/config_test.go index 09d4da5a180..5b08a15fe95 100644 --- a/libbeat/processors/dissect/config_test.go +++ b/libbeat/processors/dissect/config_test.go @@ -40,6 +40,7 @@ func TestConfig(t *testing.T) { if !assert.NoError(t, err) { return } + assert.Equal(t, trimModeNone, cfg.TrimValues) }) t.Run("invalid", func(t *testing.T) { @@ -100,4 +101,39 @@ func TestConfig(t *testing.T) { return } }) + + t.Run("with wrong trim_mode", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "hello %{what}", + "field": "message", + "trim_values": "bananas", + }) + if !assert.NoError(t, err) { + return + } + + cfg := config{} + err = c.Unpack(&cfg) + if !assert.Error(t, err) { + return + } + }) + + t.Run("with valid trim_mode", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "hello %{what}", + "field": "message", + "trim_values": "all", + }) + if !assert.NoError(t, err) { + return + } + + cfg := config{} + err = c.Unpack(&cfg) + if !assert.NoError(t, err) { + return + } + assert.Equal(t, trimModeAll, cfg.TrimValues) + }) } diff --git a/libbeat/processors/dissect/dissect.go b/libbeat/processors/dissect/dissect.go index 406027adfa3..bc9b0c75867 100644 --- a/libbeat/processors/dissect/dissect.go +++ b/libbeat/processors/dissect/dissect.go @@ -33,8 +33,9 @@ type position struct { // Dissector is a tokenizer based on the Dissect syntax as defined at: // https://www.elastic.co/guide/en/logstash/current/plugins-filters-dissect.html type Dissector struct { - raw string - parser *parser + raw string + parser *parser + trimmer trimmer } // Dissect takes the raw string and will use the defined tokenizer to return a map with the @@ -57,7 +58,12 @@ func (d *Dissector) Dissect(s string) (Map, error) { if len(positions) == 0 { return nil, errParsingFailure } - + if d.trimmer != nil { + for idx, pos := range positions { + pos.start, pos.end = d.trimmer.Trim(s, pos.start, pos.end) + positions[idx] = pos + } + } return d.resolve(s, positions), nil } diff --git a/libbeat/processors/dissect/docs/dissect.asciidoc b/libbeat/processors/dissect/docs/dissect.asciidoc index e11d8ed50b9..e8edaa822e1 100644 --- a/libbeat/processors/dissect/docs/dissect.asciidoc +++ b/libbeat/processors/dissect/docs/dissect.asciidoc @@ -25,7 +25,29 @@ The `dissect` processor has the following configuration settings: `target_prefix`:: (Optional) The name of the field where the values will be extracted. When an empty string is defined, the processor will create the keys at the root of the event. Default is `dissect`. When the target key already exists in the event, the processor won't replace it and log -an error; you need to either drop or rename the key before using dissect. +an error; you need to either drop or rename the key before using dissect, or +enable the `overwrite_keys` flag. + +`ignore_failure`:: (Optional) Flag to control whether the processor returns an error if the +tokenizer fails to match the message field. If set to true, the processor will silently restore +the original event, allowing execution of subsequent processors (if any). If set to false +(default), the processor will log an error, preventing execution of other processors. + +`overwrite_keys`:: (Optional) When set to true, the processor will overwrite +existing keys in the event. The default is false, which causes the processor +to fail when a key already exists. + +`trim_values`:: (Optional) Enables the trimming of the extracted values. Useful +to remove leading and/or trailing spaces. Possible values are: +- `none`: (default) no trimming is performed. +- `left`: values are trimmed on the left (leading). +- `right`: values are trimmed on the right (trailing). +- `all`: values are trimmed for leading and trailing. + +`trim_chars`:: (Optional) Set of characters to trim from values, when trimming +is enabled. The default is to trim the space character (`" "`). To trim multiple +characters, simply set it to a string containing all characters to trim. For example, +`trim_chars: " \t"` will trim spaces and/or tabs. For tokenization to be successful, all keys must be found and extracted, if one of them cannot be found an error will be logged and no modification is done on the original event. diff --git a/libbeat/processors/dissect/processor.go b/libbeat/processors/dissect/processor.go index ac812e9ffae..746c86ba6c6 100644 --- a/libbeat/processors/dissect/processor.go +++ b/libbeat/processors/dissect/processor.go @@ -46,6 +46,14 @@ func NewProcessor(c *common.Config) (processors.Processor, error) { if err != nil { return nil, err } + if config.TrimValues != trimModeNone { + config.Tokenizer.trimmer, err = newTrimmer(config.TrimChars, + config.TrimValues&trimModeLeft != 0, + config.TrimValues&trimModeRight != 0) + if err != nil { + return nil, err + } + } p := &processor{config: config} return p, nil @@ -72,7 +80,9 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { ); err != nil { return event, errors.Wrap(err, "cannot add new flag the event") } - + if p.config.IgnoreFailure { + return event, nil + } return event, err } @@ -94,7 +104,7 @@ func (p *processor) mapper(event *beat.Event, m common.MapStr) (*beat.Event, err var prefixKey string for k, v := range m { prefixKey = prefix + k - if _, err := event.GetValue(prefixKey); err == common.ErrKeyNotFound { + if _, err := event.GetValue(prefixKey); err == common.ErrKeyNotFound || p.config.OverwriteKeys { event.PutValue(prefixKey, v) } else { event.Fields = copy diff --git a/libbeat/processors/dissect/processor_test.go b/libbeat/processors/dissect/processor_test.go index 26a579ae699..919ec66bc90 100644 --- a/libbeat/processors/dissect/processor_test.go +++ b/libbeat/processors/dissect/processor_test.go @@ -18,6 +18,7 @@ package dissect import ( + "errors" "testing" "github.com/stretchr/testify/assert" @@ -75,6 +76,52 @@ func TestProcessor(t *testing.T) { fields: common.MapStr{"message": "hello world super", "extracted": common.MapStr{"not": "hello"}}, values: map[string]string{"extracted.key": "world", "extracted.key2": "super", "extracted.not": "hello"}, }, + { + name: "trimming trailing spaces", + c: map[string]interface{}{ + "tokenizer": "hello %{key} %{key2}", + "target_prefix": "", + "field": "message", + "trim_values": "right", + "trim_chars": " \t", + }, + fields: common.MapStr{"message": "hello world\t super "}, + values: map[string]string{"key": "world", "key2": "super"}, + }, + { + name: "not trimming by default", + c: map[string]interface{}{ + "tokenizer": "hello %{key} %{key2}", + "target_prefix": "", + "field": "message", + }, + fields: common.MapStr{"message": "hello world\t super "}, + values: map[string]string{"key": "world\t", "key2": "super "}, + }, + { + name: "trim leading space", + c: map[string]interface{}{ + "tokenizer": "hello %{key} %{key2}", + "target_prefix": "", + "field": "message", + "trim_values": "left", + "trim_chars": " \t", + }, + fields: common.MapStr{"message": "hello \tworld\t \tsuper "}, + values: map[string]string{"key": "world\t", "key2": "super "}, + }, + { + name: "trim all space", + c: map[string]interface{}{ + "tokenizer": "hello %{key} %{key2}", + "target_prefix": "", + "field": "message", + "trim_values": "all", + "trim_chars": " \t", + }, + fields: common.MapStr{"message": "hello \tworld\t \tsuper "}, + values: map[string]string{"key": "world", "key2": "super"}, + }, } for _, test := range tests { @@ -232,3 +279,134 @@ func TestErrorFlagging(t *testing.T) { assert.Error(t, err) }) } + +func TestIgnoreFailure(t *testing.T) { + tests := []struct { + name string + c map[string]interface{} + msg string + err error + flags bool + }{ + { + name: "default is to fail", + c: map[string]interface{}{"tokenizer": "hello %{key}"}, + msg: "something completely different", + err: errors.New("could not find beginning delimiter: `hello ` in remaining: `something completely different`, (offset: 0)"), + flags: true, + }, + { + name: "ignore_failure is a noop on success", + c: map[string]interface{}{"tokenizer": "hello %{key}", "ignore_failure": true}, + msg: "hello world", + }, + { + name: "ignore_failure hides the error but maintains flags", + c: map[string]interface{}{"tokenizer": "hello %{key}", "ignore_failure": true}, + msg: "something completely different", + flags: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c, err := common.NewConfigFrom(test.c) + if !assert.NoError(t, err) { + return + } + + processor, err := NewProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: common.MapStr{"message": test.msg}} + event, err := processor.Run(&e) + if test.err == nil { + if !assert.NoError(t, err) { + return + } + } else { + if !assert.EqualError(t, err, test.err.Error()) { + return + } + } + flags, err := event.GetValue(beat.FlagField) + if test.flags { + if !assert.NoError(t, err) || !assert.Contains(t, flags, flagParsingError) { + return + } + } else { + if !assert.Error(t, err) { + return + } + } + }) + } +} + +func TestOverwriteKeys(t *testing.T) { + tests := []struct { + name string + c map[string]interface{} + fields common.MapStr + values common.MapStr + err error + }{ + { + name: "fail by default if key exists", + c: map[string]interface{}{"tokenizer": "hello %{key}", "target_prefix": ""}, + fields: common.MapStr{"message": "hello world", "key": 42}, + values: common.MapStr{"message": "hello world", "key": 42}, + err: errors.New("cannot override existing key with `key`"), + }, + { + name: "fail if key exists and overwrite disabled", + c: map[string]interface{}{"tokenizer": "hello %{key}", "target_prefix": "", "overwrite_keys": false}, + fields: common.MapStr{"message": "hello world", "key": 42}, + values: common.MapStr{"message": "hello world", "key": 42}, + err: errors.New("cannot override existing key with `key`"), + }, + { + name: "overwrite existing keys", + c: map[string]interface{}{"tokenizer": "hello %{key}", "target_prefix": "", "overwrite_keys": true}, + fields: common.MapStr{"message": "hello world", "key": 42}, + values: common.MapStr{"message": "hello world", "key": "world"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c, err := common.NewConfigFrom(test.c) + if !assert.NoError(t, err) { + return + } + + processor, err := NewProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: test.fields} + event, err := processor.Run(&e) + if test.err == nil { + if !assert.NoError(t, err) { + return + } + } else { + if !assert.EqualError(t, err, test.err.Error()) { + return + } + } + + for field, value := range test.values { + v, err := event.GetValue(field) + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, value, v) + } + }) + } +} diff --git a/libbeat/processors/dissect/trim.go b/libbeat/processors/dissect/trim.go new file mode 100644 index 00000000000..8224dcc8ffc --- /dev/null +++ b/libbeat/processors/dissect/trim.go @@ -0,0 +1,109 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dissect + +import ( + "errors" + "strings" + "unicode/utf8" +) + +const asciiLimit = 128 + +type trimmer interface { + Trim(s string, start, end int) (int, int) +} + +func newTrimmer(trimChars string, trimLeft, trimRight bool) (t trimmer, err error) { + if t, err = newASCIITrimmer(trimChars, trimLeft, trimRight); err == errOnlyASCII { + t, err = newUTF8Trimmer(trimChars, trimLeft, trimRight) + } + return t, err +} + +type asciiTrimmer struct { + chars [127]byte + left, right bool +} + +var errOnlyASCII = errors.New("only trimming of ASCII characters is supported") + +func newASCIITrimmer(trimChars string, trimLeft, trimRight bool) (trimmer, error) { + t := asciiTrimmer{ + left: trimLeft, + right: trimRight, + } + for _, chr := range []byte(trimChars) { + if chr >= asciiLimit { + return t, errOnlyASCII + } + t.chars[chr] = 1 + } + return t, nil +} + +func (t asciiTrimmer) Trim(s string, start, end int) (int, int) { + if t.left { + for ; start < end && s[start] < asciiLimit && t.chars[s[start]] != 0; start++ { + } + } + if t.right { + for ; start < end && s[end-1] < asciiLimit && t.chars[s[end-1]] != 0; end-- { + } + } + return start, end +} + +type utf8trimmer struct { + fn func(rune) bool + left, right bool +} + +func newUTF8Trimmer(trimChars string, trimLeft, trimRight bool) (trimmer, error) { + return utf8trimmer{ + // Function that returns true when the rune is not in trimChars. + fn: func(r rune) bool { + return strings.IndexRune(trimChars, r) == -1 + }, + left: trimLeft, + right: trimRight, + }, nil +} + +func (t utf8trimmer) Trim(s string, start, end int) (int, int) { + if t.left { + // Find first character not in trimChars. + pos := strings.IndexFunc(s[start:end], t.fn) + if pos == -1 { + return end, end + } + start += pos + } + if t.right { + // Find last character not in trimChars. + pos := strings.LastIndexFunc(s[start:end], t.fn) + if pos == -1 { + return start, start + } + // End must point to the following character, need to take into account + // that the last character can be more than 1-byte wide. + _, width := utf8.DecodeRuneInString(s[start+pos:]) + end = start + pos + width + } + return start, end +} diff --git a/libbeat/processors/dissect/trim_test.go b/libbeat/processors/dissect/trim_test.go new file mode 100644 index 00000000000..9ca1f089a17 --- /dev/null +++ b/libbeat/processors/dissect/trim_test.go @@ -0,0 +1,229 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dissect + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type stripCase struct { + s string + start, end int +} + +func makeStrings(n, l int) []stripCase { + cases := make([]stripCase, n) + for idx := range cases { + data := make([]byte, l) + start := (idx / 2) % l + end := l - ((idx+1)/2)%l + if end < start { + start, end = end, start + } + if start == end { + start, end = l, l + } + for i := 0; i < start; i++ { + data[i] = ' ' + } + for i := start; i < end; i++ { + data[i] = 'X' + } + for i := end; i < l; i++ { + data[i] = ' ' + } + cases[idx] = stripCase{string(data), start, end} + } + return cases +} + +func benchStrip(b *testing.B, l int, t trimmer) { + cases := makeStrings(b.N, l) + b.ResetTimer() + for idx, c := range cases { + start, end := t.Trim(c.s, 0, len(c.s)) + if start != c.start || end != c.end { + b.Logf("bad result idx=%d len=%d expected=(%d,%d) actual=(%d,%d)", + idx, len(c.s), c.start, c.end, start, end) + b.Fail() + } + } +} + +func benchStripASCII(b *testing.B, l int) { + trimmer, err := newASCIITrimmer(" ", true, true) + if !assert.NoError(b, err) { + b.Fail() + return + } + benchStrip(b, l, trimmer) +} + +func benchStripUTF8(b *testing.B, l int) { + trimmer, err := newUTF8Trimmer(" ", true, true) + if !assert.NoError(b, err) { + b.Fail() + return + } + benchStrip(b, l, trimmer) +} + +func BenchmarkStripASCII_4(b *testing.B) { + benchStripASCII(b, 4) +} + +func BenchmarkStripASCII_8(b *testing.B) { + benchStripASCII(b, 8) +} + +func BenchmarkStripASCII_32(b *testing.B) { + benchStripASCII(b, 32) +} + +func BenchmarkStripASCII_128(b *testing.B) { + benchStripASCII(b, 128) +} + +func BenchmarkStripASCII_512(b *testing.B) { + benchStripASCII(b, 512) +} + +func BenchmarkStripUTF8_4(b *testing.B) { + benchStripUTF8(b, 4) +} + +func BenchmarkStripUTF8_8(b *testing.B) { + benchStripUTF8(b, 8) +} + +func BenchmarkStripUTF8_32(b *testing.B) { + benchStripUTF8(b, 32) +} + +func BenchmarkStripUTF8_128(b *testing.B) { + benchStripUTF8(b, 128) +} + +func BenchmarkStripUTF8_512(b *testing.B) { + benchStripUTF8(b, 512) +} + +func TestTrimmer(t *testing.T) { + for _, test := range []struct { + name, cutset string + left, right bool + input, expected string + }{ + { + name: "single space right", + cutset: " ", + right: true, + input: " hello world! ", + expected: " hello world!", + }, + { + name: "noop right", + cutset: " ", + right: true, + input: " hello world!", + expected: " hello world!", + }, + { + name: "single space left", + cutset: " ", + left: true, + input: " hello world! ", + expected: "hello world! ", + }, + { + name: "noop left", + cutset: " ", + left: true, + input: "hello world! ", + expected: "hello world! ", + }, + { + name: "trim both", + cutset: " ", + left: true, + right: true, + input: " hello world! ", + expected: "hello world!", + }, + { + name: "non-space", + cutset: "h", + left: true, + right: true, + input: "hello world!", + expected: "ello world!", + }, + { + name: "multiple chars", + cutset: " \t_-", + left: true, + right: true, + input: "\t\t___here - -", + expected: "here", + }, + { + name: "empty string", + cutset: " \t_-", + left: true, + right: true, + input: "", + expected: "", + }, + { + name: "trim all", + cutset: " \t_-", + left: true, + right: true, + input: " \t__-", + expected: "", + }, + { + name: "trim UTF-8", + cutset: "฿นเผ„๐‘", + left: true, + right: true, + input: "เผ„๐‘…€฿น๊ง฿น๐‘", + expected: "๐‘…€฿น๊ง", + }, + { + name: "trim ASCII cutset in UTF-8 input", + cutset: " \t\rรฟ", + left: true, + right: true, + input: "\t\tเผ„๐‘…€฿น๊ง฿น๐‘ รฟ", + expected: "เผ„๐‘…€฿น๊ง฿น๐‘", + }, + } { + t.Run(test.name, func(t *testing.T) { + trimmer, err := newTrimmer(test.cutset, test.left, test.right) + if !assert.NoError(t, err) { + return + } + start, end := trimmer.Trim(test.input, 0, len(test.input)) + output := test.input[start:end] + assert.Equal(t, test.expected, output) + }) + } +}