From 15de94d3310f7690aa5a4dfd8a7ec87ed24718d8 Mon Sep 17 00:00:00 2001 From: WinIT23 Date: Sun, 16 Oct 2022 15:50:33 +0530 Subject: [PATCH 1/9] initial draft for append processor. --- libbeat/processors/actions/append.go | 127 +++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 libbeat/processors/actions/append.go diff --git a/libbeat/processors/actions/append.go b/libbeat/processors/actions/append.go new file mode 100644 index 00000000000..48273fdece7 --- /dev/null +++ b/libbeat/processors/actions/append.go @@ -0,0 +1,127 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/checks" + jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type appendProcessor struct { + config appendConfig + logger *logp.Logger +} + +type appendConfig struct { + Fields []string `config:"fields"` + TargetField string `config:"target_field"` + Values []string `config:"values"` + IgnoreMissing bool `config:"ignore_missing"` + IgnoreEmptyValues bool `config:"ignore_empty_values"` + FailOnError bool `config:"fail_on_error"` + AllowDuplicate bool `config:"allow_duplicate"` +} + +func init() { + processors.RegisterPlugin("append_processor", + checks.ConfigChecked(NewAppend, + checks.RequireFields("target_field"), + ), + ) + jsprocessor.RegisterPlugin("Append", NewAppend) +} + +// NewAppend returns a new append_processor processor. +func NewAppend(c *conf.C) (processors.Processor, error) { + config := appendConfig{ + IgnoreMissing: false, + IgnoreEmptyValues: false, + FailOnError: true, + AllowDuplicate: true, + } + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the configuration of append processor: %s", err) + } + + f := &appendProcessor{ + config: config, + logger: logp.NewLogger("append_processor"), + } + return f, nil +} + +func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) { + var backup *beat.Event + if f.config.FailOnError { + backup = event.Clone() + } + + err := f.appendValues(f.config.TargetField, f.config.Fields, f.config.Values, event) + if err != nil { + errMsg := fmt.Errorf("failed to append fields in append_processor processor: %s", err) + f.logger.Debug(errMsg.Error()) + if f.config.FailOnError { + event = backup + event.PutValue("error.message", errMsg.Error()) + return event, err + } + } + + return event, nil +} + +func (f *appendProcessor) appendValues(target string, fields []string, values []string, event *beat.Event) error { + var arr []interface{} + + val, err := event.GetValue(target) + if err == nil { + return fmt.Errorf("could not fetch value for key: %s, Error: %s", target, err) + } + arr = append(arr, val) + + for _, field := range fields { + val, err := event.GetValue(field) + if err == nil { + if f.config.IgnoreMissing && errors.Is(err, mapstr.ErrKeyNotFound) { + return nil + } + return fmt.Errorf("could not fetch value for key: %s, Error: %s", field, err) + } + arr = append(arr, val) + } + + arr = append(arr, values) + + event.Delete(target) + event.PutValue(target, arr) + return nil +} + +func (f *appendProcessor) String() string { + return "append_processor=" + fmt.Sprintf("%+v", f.config.Fields) +} From 98e3831555469f78103be6e69c1338178f9d3127 Mon Sep 17 00:00:00 2001 From: WinIT23 Date: Sun, 16 Oct 2022 16:56:26 +0530 Subject: [PATCH 2/9] Added Unit tests and handled some corner cases. --- libbeat/processors/actions/append.go | 59 ++-- libbeat/processors/actions/append_test.go | 315 ++++++++++++++++++++++ 2 files changed, 358 insertions(+), 16 deletions(-) create mode 100644 libbeat/processors/actions/append_test.go diff --git a/libbeat/processors/actions/append.go b/libbeat/processors/actions/append.go index 48273fdece7..960c255d4a3 100644 --- a/libbeat/processors/actions/append.go +++ b/libbeat/processors/actions/append.go @@ -37,13 +37,13 @@ type appendProcessor struct { } type appendConfig struct { - Fields []string `config:"fields"` - TargetField string `config:"target_field"` - Values []string `config:"values"` - IgnoreMissing bool `config:"ignore_missing"` - IgnoreEmptyValues bool `config:"ignore_empty_values"` - FailOnError bool `config:"fail_on_error"` - AllowDuplicate bool `config:"allow_duplicate"` + Fields []string `config:"fields"` + TargetField string `config:"target_field"` + Values []interface{} `config:"values"` + IgnoreMissing bool `config:"ignore_missing"` + IgnoreEmptyValues bool `config:"ignore_empty_values"` + FailOnError bool `config:"fail_on_error"` + AllowDuplicate bool `config:"allow_duplicate"` //TODO: Add functionality to remove duplicate } func init() { @@ -95,27 +95,44 @@ func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (f *appendProcessor) appendValues(target string, fields []string, values []string, event *beat.Event) error { +func (f *appendProcessor) appendValues(target string, fields []string, values []interface{}, event *beat.Event) error { var arr []interface{} val, err := event.GetValue(target) - if err == nil { - return fmt.Errorf("could not fetch value for key: %s, Error: %s", target, err) + if err != nil { + f.logger.Debugf("could not fetch value for key: %s. all the values will be appended in a new key %s.", target, target) + } else { + arr = append(arr, val) } - arr = append(arr, val) for _, field := range fields { + val, err := event.GetValue(field) - if err == nil { + if err != nil { if f.config.IgnoreMissing && errors.Is(err, mapstr.ErrKeyNotFound) { - return nil + continue } return fmt.Errorf("could not fetch value for key: %s, Error: %s", field, err) } - arr = append(arr, val) + + if f.config.IgnoreMissing && errors.Is(err, mapstr.ErrKeyNotFound) { + continue + } + + valArr, ok := val.([]interface{}) + if ok { + arr = append(arr, valArr...) + } else { + arr = append(arr, val) + } } - arr = append(arr, values) + arr = append(arr, values...) + + // remove empty strings and nil from the array + if f.config.IgnoreEmptyValues { + arr = cleanEmptyValues(arr) + } event.Delete(target) event.PutValue(target, arr) @@ -123,5 +140,15 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] } func (f *appendProcessor) String() string { - return "append_processor=" + fmt.Sprintf("%+v", f.config.Fields) + return "append_processor=" + fmt.Sprintf("%+v", f.config.TargetField) +} + +func cleanEmptyValues(dirtyArr []interface{}) (cleanArr []interface{}) { + for _, val := range dirtyArr { + if val == "" || val == nil { + continue + } + cleanArr = append(cleanArr, val) + } + return cleanArr } diff --git a/libbeat/processors/actions/append_test.go b/libbeat/processors/actions/append_test.go new file mode 100644 index 00000000000..9d945907143 --- /dev/null +++ b/libbeat/processors/actions/append_test.go @@ -0,0 +1,315 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "reflect" + "testing" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +var log = logp.NewLogger("append_test") + +func Test_cleanEmptyValues(t *testing.T) { + type args struct { + dirtyArr []interface{} + } + tests := []struct { + description string + args args + wantCleanArr []interface{} + }{ + { + description: "array with empty values", + args: args{ + dirtyArr: []interface{}{"asdf", "", 12, "", nil}, + }, + wantCleanArr: []interface{}{"asdf", 12}, + }, + { + description: "array with no empty values", + args: args{ + dirtyArr: []interface{}{"asdf", "asd", 12, 123}, + }, + wantCleanArr: []interface{}{"asdf", "asd", 12, 123}, + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + if gotCleanArr := cleanEmptyValues(tt.args.dirtyArr); !reflect.DeepEqual(gotCleanArr, tt.wantCleanArr) { + t.Errorf("cleanEmptyValues() = %v, want %v", gotCleanArr, tt.wantCleanArr) + } + }) + } +} + +func Test_appendProcessor_appendValues(t *testing.T) { + type fields struct { + config appendConfig + logger *logp.Logger + } + type args struct { + target string + fields []string + values []interface{} + event *beat.Event + } + tests := []struct { + description string + fields fields + args args + wantErr bool + }{ + { + description: "append value in the arrays from a field when target_field is present", + args: args{ + target: "target", + fields: []string{"field"}, + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "target": []interface{}{"value1", "value2"}, + "field": "value", + }, + }, + }, + fields: fields{ + logger: log, + config: appendConfig{ + Fields: []string{"field"}, + TargetField: "target", + }, + }, + wantErr: false, + }, + { + description: "append value in the arrays from a field when target_field is not present", + args: args{ + target: "target", + fields: []string{"field"}, + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "value", + }, + }, + }, + fields: fields{ + logger: log, + config: appendConfig{ + Fields: []string{"field"}, + TargetField: "target", + }, + }, + wantErr: false, + }, + { + description: "append value in the arrays from an unknown field", + args: args{ + target: "target", + fields: []string{"some-field"}, + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "value", + }, + }, + }, + fields: fields{ + logger: log, + config: appendConfig{ + IgnoreEmptyValues: false, + IgnoreMissing: false, + AllowDuplicate: true, + FailOnError: true, + }, + }, + wantErr: true, + }, + { + description: "append value in the arrays from an unknown field with 'ignore_missing: true'", + args: args{ + target: "target", + fields: []string{"some-field"}, + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "value", + }, + }, + }, + fields: fields{ + logger: log, + config: appendConfig{ + IgnoreEmptyValues: false, + IgnoreMissing: true, + AllowDuplicate: true, + FailOnError: true, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + f := &appendProcessor{ + config: tt.fields.config, + logger: tt.fields.logger, + } + if err := f.appendValues(tt.args.target, tt.args.fields, tt.args.values, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("appendProcessor.appendValues() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_appendProcessor_Run(t *testing.T) { + type fields struct { + config appendConfig + logger *logp.Logger + } + type args struct { + event *beat.Event + } + tests := []struct { + description string + fields fields + args args + want *beat.Event + wantErr bool + }{ + { + description: "positive flow", + fields: fields{ + logger: log, + config: appendConfig{ + Fields: []string{"array-one", "array-two", "concrete-field"}, + TargetField: "target", + Values: []interface{}{"value1", "value2"}, + IgnoreMissing: false, + IgnoreEmptyValues: false, + FailOnError: true, + AllowDuplicate: true, + }, + }, + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "some-value", + "array-one": []interface{}{"one", "", "two", "three"}, + "array-two": []interface{}{"four", "five", ""}, + }, + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "some-value", + "array-one": []interface{}{"one", "", "two", "three"}, + "array-two": []interface{}{"four", "five", ""}, + "target": []interface{}{"one", "", "two", "three", "four", "five", "", "some-value", "value1", "value2"}, + }, + }, + }, + { + description: "remove empty values form output - 'ignore_empty_values: true'", + fields: fields{ + logger: log, + config: appendConfig{ + Fields: []string{"array-one", "array-two", "concrete-field"}, + TargetField: "target", + Values: []interface{}{"value1", nil, "value2", "", nil}, + IgnoreMissing: false, + IgnoreEmptyValues: true, + FailOnError: true, + AllowDuplicate: true, + }, + }, + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "", + "array-one": []interface{}{"one", "", "two", "three"}, + "array-two": []interface{}{"four", "five", ""}, + }, + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "", + "array-one": []interface{}{"one", "", "two", "three"}, + "array-two": []interface{}{"four", "five", ""}, + "target": []interface{}{"one", "two", "three", "four", "five", "value1", "value2"}, + }, + }, + }, + { + description: "append value of a missing field with 'ignore_missing: false'", + fields: fields{ + logger: log, + config: appendConfig{ + Fields: []string{"missing-field"}, + TargetField: "target", + IgnoreMissing: false, + IgnoreEmptyValues: false, + FailOnError: true, + AllowDuplicate: true, + }, + }, + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{}, + }, + }, + wantErr: true, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "error": mapstr.M{ + "message": "failed to append fields in append_processor processor: could not fetch value for key: missing-field, Error: key not found", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + f := &appendProcessor{ + config: tt.fields.config, + logger: tt.fields.logger, + } + got, err := f.Run(tt.args.event) + if (err != nil) != tt.wantErr { + t.Errorf("appendProcessor.Run() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("appendProcessor.Run() = %v, want %v", got, tt.want) + } + }) + } +} From 379d304005c0994137fc27663f2d329e64c1b759 Mon Sep 17 00:00:00 2001 From: WinIT23 Date: Sun, 16 Oct 2022 17:09:39 +0530 Subject: [PATCH 3/9] Added unit test for nested fields. --- libbeat/processors/actions/append_test.go | 39 +++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/libbeat/processors/actions/append_test.go b/libbeat/processors/actions/append_test.go index 9d945907143..61abac4c5c5 100644 --- a/libbeat/processors/actions/append_test.go +++ b/libbeat/processors/actions/append_test.go @@ -231,6 +231,45 @@ func Test_appendProcessor_Run(t *testing.T) { }, }, }, + { + description: "test for nested field", + fields: fields{ + logger: log, + config: appendConfig{ + Fields: []string{"array.one", "array.two", "concrete-field"}, + TargetField: "target", + Values: []interface{}{"value1", "value2"}, + IgnoreMissing: false, + IgnoreEmptyValues: false, + FailOnError: true, + AllowDuplicate: true, + }, + }, + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "some-value", + "array": mapstr.M{ + "one": []interface{}{"one", "", "two", "three"}, + "two": []interface{}{"four", "five", ""}, + }, + }, + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "some-value", + "array": mapstr.M{ + "one": []interface{}{"one", "", "two", "three"}, + "two": []interface{}{"four", "five", ""}, + }, + "target": []interface{}{"one", "", "two", "three", "four", "five", "", "some-value", "value1", "value2"}, + }, + }, + }, { description: "remove empty values form output - 'ignore_empty_values: true'", fields: fields{ From 7b49c7c4a527e3e682ec34b0bb7728b7c84dd39d Mon Sep 17 00:00:00 2001 From: WinIT23 Date: Sun, 16 Oct 2022 17:59:18 +0530 Subject: [PATCH 4/9] Added asciidoc for the processor. --- .../processors/actions/docs/append.asciidoc | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 libbeat/processors/actions/docs/append.asciidoc diff --git a/libbeat/processors/actions/docs/append.asciidoc b/libbeat/processors/actions/docs/append.asciidoc new file mode 100644 index 00000000000..96a73a339ad --- /dev/null +++ b/libbeat/processors/actions/docs/append.asciidoc @@ -0,0 +1,54 @@ +[[append]] +=== Append Processor + +++++ +append_processor +++++ + +The `append_processor` processor appends static values and values from the listed fields to target field. + +`target_field`:: The field in which you want to append the data. +`fields`:: () List of fields from which you want to copy data from. If the value is of a concrete type it will be appended directly to the target. +However, if the value is an array, all the elements of the array are pushed individually to the target field. +`values`:: (Optional) List of static values you want to append to target field. +`ignore_empty_values`:: (Optional) If set to `true`, all the `""` and `nil` are omitted from being appended to the target field. +`fail_on_error`:: (Optional) If set to `true` and an error occurs, the changes are reverted and the original is returned. If set to `false`, +processing continues if an error occurs. Default is `true`. +`ignore_missing`:: (Optional) Indicates whether to ignore events that lack the source + field. The default is `false`, which will fail processing of + an event if a field is missing. + +note: If you want to use `fields` parameter with fields under `message`, make sure you use `decode_json_fields` first with `target: ""`. + +For example, this configuration: + +[source,yaml] +------------------------------------------------------------------------------ +processors: + - decode_json_fields: + fields: message + target: "" + - append_processor: + target_field: my-target-field + fields: + - concrete.field + - array.one + values: + - static-value + - "" + ignore_missing: true + fail_on_error: true + ignore_empty_values: true +------------------------------------------------------------------------------ + +Copies the values of `concrete.field`, `array.one` from response field to `my-target-field`: + +[source,json] +------------------------------------------------------------------------------- +{ + "message": "my-interesting-message", + "event": { + "original": "my-interesting-message" + } +} +------------------------------------------------------------------------------- From 020214affd20f05cabfd52eb80bebd1586e36953 Mon Sep 17 00:00:00 2001 From: WinIT23 Date: Mon, 17 Oct 2022 04:03:13 +0530 Subject: [PATCH 5/9] Fixed linting issues. --- libbeat/processors/actions/append.go | 38 +++++++++++++---------- libbeat/processors/actions/append_test.go | 20 ++++++------ 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/libbeat/processors/actions/append.go b/libbeat/processors/actions/append.go index 960c255d4a3..a08e7af7439 100644 --- a/libbeat/processors/actions/append.go +++ b/libbeat/processors/actions/append.go @@ -32,11 +32,11 @@ import ( ) type appendProcessor struct { - config appendConfig + config appendProcessorConfig logger *logp.Logger } -type appendConfig struct { +type appendProcessorConfig struct { Fields []string `config:"fields"` TargetField string `config:"target_field"` Values []interface{} `config:"values"` @@ -48,16 +48,16 @@ type appendConfig struct { func init() { processors.RegisterPlugin("append_processor", - checks.ConfigChecked(NewAppend, + checks.ConfigChecked(NewAppendProcessor, checks.RequireFields("target_field"), ), ) - jsprocessor.RegisterPlugin("Append", NewAppend) + jsprocessor.RegisterPlugin("AppendProcessor", NewAppendProcessor) } -// NewAppend returns a new append_processor processor. -func NewAppend(c *conf.C) (processors.Processor, error) { - config := appendConfig{ +// NewAppendProcessor returns a new append_processor processor. +func NewAppendProcessor(c *conf.C) (processors.Processor, error) { + config := appendProcessorConfig{ IgnoreMissing: false, IgnoreEmptyValues: false, FailOnError: true, @@ -65,7 +65,7 @@ func NewAppend(c *conf.C) (processors.Processor, error) { } err := c.Unpack(&config) if err != nil { - return nil, fmt.Errorf("failed to unpack the configuration of append processor: %s", err) + return nil, fmt.Errorf("failed to unpack the configuration of append processor: %w", err) } f := &appendProcessor{ @@ -83,11 +83,13 @@ func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) { err := f.appendValues(f.config.TargetField, f.config.Fields, f.config.Values, event) if err != nil { - errMsg := fmt.Errorf("failed to append fields in append_processor processor: %s", err) + errMsg := fmt.Errorf("failed to append fields in append_processor processor: %w", err) f.logger.Debug(errMsg.Error()) if f.config.FailOnError { event = backup - event.PutValue("error.message", errMsg.Error()) + if _, err := event.PutValue("error.message", errMsg.Error()); err != nil { + return nil, fmt.Errorf("failed to append fields in append_processor processor: %w", err) + } return event, err } } @@ -112,11 +114,7 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] if f.config.IgnoreMissing && errors.Is(err, mapstr.ErrKeyNotFound) { continue } - return fmt.Errorf("could not fetch value for key: %s, Error: %s", field, err) - } - - if f.config.IgnoreMissing && errors.Is(err, mapstr.ErrKeyNotFound) { - continue + return fmt.Errorf("could not fetch value for key: %s, Error: %w", field, err) } valArr, ok := val.([]interface{}) @@ -134,8 +132,14 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] arr = cleanEmptyValues(arr) } - event.Delete(target) - event.PutValue(target, arr) + if err := event.Delete(target); err != nil && !errors.Is(err, mapstr.ErrKeyNotFound) { + return fmt.Errorf("unable to delete the target field %s due to error: %w", target, err) + } + + if _, err := event.PutValue(target, arr); err != nil { + return fmt.Errorf("unable to put values in the target field %s due to error: %w", target, err) + } + return nil } diff --git a/libbeat/processors/actions/append_test.go b/libbeat/processors/actions/append_test.go index 61abac4c5c5..9b7cf61980b 100644 --- a/libbeat/processors/actions/append_test.go +++ b/libbeat/processors/actions/append_test.go @@ -63,7 +63,7 @@ func Test_cleanEmptyValues(t *testing.T) { func Test_appendProcessor_appendValues(t *testing.T) { type fields struct { - config appendConfig + config appendProcessorConfig logger *logp.Logger } type args struct { @@ -93,7 +93,7 @@ func Test_appendProcessor_appendValues(t *testing.T) { }, fields: fields{ logger: log, - config: appendConfig{ + config: appendProcessorConfig{ Fields: []string{"field"}, TargetField: "target", }, @@ -114,7 +114,7 @@ func Test_appendProcessor_appendValues(t *testing.T) { }, fields: fields{ logger: log, - config: appendConfig{ + config: appendProcessorConfig{ Fields: []string{"field"}, TargetField: "target", }, @@ -135,7 +135,7 @@ func Test_appendProcessor_appendValues(t *testing.T) { }, fields: fields{ logger: log, - config: appendConfig{ + config: appendProcessorConfig{ IgnoreEmptyValues: false, IgnoreMissing: false, AllowDuplicate: true, @@ -158,7 +158,7 @@ func Test_appendProcessor_appendValues(t *testing.T) { }, fields: fields{ logger: log, - config: appendConfig{ + config: appendProcessorConfig{ IgnoreEmptyValues: false, IgnoreMissing: true, AllowDuplicate: true, @@ -183,7 +183,7 @@ func Test_appendProcessor_appendValues(t *testing.T) { func Test_appendProcessor_Run(t *testing.T) { type fields struct { - config appendConfig + config appendProcessorConfig logger *logp.Logger } type args struct { @@ -200,7 +200,7 @@ func Test_appendProcessor_Run(t *testing.T) { description: "positive flow", fields: fields{ logger: log, - config: appendConfig{ + config: appendProcessorConfig{ Fields: []string{"array-one", "array-two", "concrete-field"}, TargetField: "target", Values: []interface{}{"value1", "value2"}, @@ -235,7 +235,7 @@ func Test_appendProcessor_Run(t *testing.T) { description: "test for nested field", fields: fields{ logger: log, - config: appendConfig{ + config: appendProcessorConfig{ Fields: []string{"array.one", "array.two", "concrete-field"}, TargetField: "target", Values: []interface{}{"value1", "value2"}, @@ -274,7 +274,7 @@ func Test_appendProcessor_Run(t *testing.T) { description: "remove empty values form output - 'ignore_empty_values: true'", fields: fields{ logger: log, - config: appendConfig{ + config: appendProcessorConfig{ Fields: []string{"array-one", "array-two", "concrete-field"}, TargetField: "target", Values: []interface{}{"value1", nil, "value2", "", nil}, @@ -309,7 +309,7 @@ func Test_appendProcessor_Run(t *testing.T) { description: "append value of a missing field with 'ignore_missing: false'", fields: fields{ logger: log, - config: appendConfig{ + config: appendProcessorConfig{ Fields: []string{"missing-field"}, TargetField: "target", IgnoreMissing: false, From 5029ac49c0499efdab4a295369c195ad53e475c5 Mon Sep 17 00:00:00 2001 From: WinIT23 Date: Mon, 17 Oct 2022 04:30:05 +0530 Subject: [PATCH 6/9] fixed linting issue. (removed package errors) --- libbeat/processors/actions/append.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/libbeat/processors/actions/append.go b/libbeat/processors/actions/append.go index a08e7af7439..9bae0470693 100644 --- a/libbeat/processors/actions/append.go +++ b/libbeat/processors/actions/append.go @@ -20,15 +20,12 @@ package actions import ( "fmt" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/checks" jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" ) type appendProcessor struct { @@ -111,7 +108,7 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] val, err := event.GetValue(field) if err != nil { - if f.config.IgnoreMissing && errors.Is(err, mapstr.ErrKeyNotFound) { + if f.config.IgnoreMissing && err.Error() == "key not found" { continue } return fmt.Errorf("could not fetch value for key: %s, Error: %w", field, err) @@ -132,7 +129,7 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] arr = cleanEmptyValues(arr) } - if err := event.Delete(target); err != nil && !errors.Is(err, mapstr.ErrKeyNotFound) { + if err := event.Delete(target); err != nil && !(err.Error() == "key not found") { return fmt.Errorf("unable to delete the target field %s due to error: %w", target, err) } From 89c108ef08ee3a35f6bd3349a72f441a02787104 Mon Sep 17 00:00:00 2001 From: WinIT23 Date: Wed, 19 Oct 2022 01:00:59 +0530 Subject: [PATCH 7/9] update the description. Added type check in target-field array. Updated test case. --- libbeat/processors/actions/append.go | 10 ++- libbeat/processors/actions/append_test.go | 76 +++++++++++++------ .../processors/actions/docs/append.asciidoc | 2 +- 3 files changed, 62 insertions(+), 26 deletions(-) diff --git a/libbeat/processors/actions/append.go b/libbeat/processors/actions/append.go index 9bae0470693..c914e070b19 100644 --- a/libbeat/processors/actions/append.go +++ b/libbeat/processors/actions/append.go @@ -97,11 +97,16 @@ func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) { func (f *appendProcessor) appendValues(target string, fields []string, values []interface{}, event *beat.Event) error { var arr []interface{} - val, err := event.GetValue(target) + targetVal, err := event.GetValue(target) if err != nil { f.logger.Debugf("could not fetch value for key: %s. all the values will be appended in a new key %s.", target, target) } else { - arr = append(arr, val) + targetArr, ok := targetVal.([]interface{}) + if ok { + arr = append(arr, targetArr...) + } else { + arr = append(arr, targetVal) + } } for _, field := range fields { @@ -136,7 +141,6 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] if _, err := event.PutValue(target, arr); err != nil { return fmt.Errorf("unable to put values in the target field %s due to error: %w", target, err) } - return nil } diff --git a/libbeat/processors/actions/append_test.go b/libbeat/processors/actions/append_test.go index 9b7cf61980b..ac2a93ffe40 100644 --- a/libbeat/processors/actions/append_test.go +++ b/libbeat/processors/actions/append_test.go @@ -78,28 +78,6 @@ func Test_appendProcessor_appendValues(t *testing.T) { args args wantErr bool }{ - { - description: "append value in the arrays from a field when target_field is present", - args: args{ - target: "target", - fields: []string{"field"}, - event: &beat.Event{ - Meta: mapstr.M{}, - Fields: mapstr.M{ - "target": []interface{}{"value1", "value2"}, - "field": "value", - }, - }, - }, - fields: fields{ - logger: log, - config: appendProcessorConfig{ - Fields: []string{"field"}, - TargetField: "target", - }, - }, - wantErr: false, - }, { description: "append value in the arrays from a field when target_field is not present", args: args{ @@ -231,6 +209,60 @@ func Test_appendProcessor_Run(t *testing.T) { }, }, }, + { + description: "append value in the arrays from a field when target_field is present and it is a scaler", + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "target": "scaler-value", + "field": "I'm being appended", + }, + }, + }, + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"field"}, + TargetField: "target", + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "I'm being appended", + "target": []interface{}{"scaler-value", "I'm being appended"}, + }, + }, + }, + { + description: "append value in the arrays from a field when target_field is present and it is an array", + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "target": []interface{}{"value1", "value2"}, + "field": "I'm being appended", + }, + }, + }, + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"field"}, + TargetField: "target", + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "I'm being appended", + "target": []interface{}{"value1", "value2", "I'm being appended"}, + }, + }, + }, { description: "test for nested field", fields: fields{ diff --git a/libbeat/processors/actions/docs/append.asciidoc b/libbeat/processors/actions/docs/append.asciidoc index 96a73a339ad..982006345df 100644 --- a/libbeat/processors/actions/docs/append.asciidoc +++ b/libbeat/processors/actions/docs/append.asciidoc @@ -5,7 +5,7 @@ append_processor ++++ -The `append_processor` processor appends static values and values from the listed fields to target field. +The `append_processor` processor appends one or more values to an existing array if the target field already exists and it is an array. COnverts a scaler to an array and appends one or more values to it if the field exists and it is a scaler. Here the values can either be one or more static values or one or more values from the fields listed under 'fields' key. `target_field`:: The field in which you want to append the data. `fields`:: () List of fields from which you want to copy data from. If the value is of a concrete type it will be appended directly to the target. From 41fbb716bb128887d7686a8e036a1a4bb2d95000 Mon Sep 17 00:00:00 2001 From: WinIT23 Date: Thu, 20 Oct 2022 00:30:08 +0530 Subject: [PATCH 8/9] Added 'allow_duplicate' parameter and code cleanup. --- libbeat/processors/actions/append.go | 28 ++++++-- libbeat/processors/actions/append_test.go | 67 +++++++++++++++++++ .../processors/actions/docs/append.asciidoc | 1 + 3 files changed, 92 insertions(+), 4 deletions(-) diff --git a/libbeat/processors/actions/append.go b/libbeat/processors/actions/append.go index c914e070b19..bce42c1e6ba 100644 --- a/libbeat/processors/actions/append.go +++ b/libbeat/processors/actions/append.go @@ -97,9 +97,10 @@ func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) { func (f *appendProcessor) appendValues(target string, fields []string, values []interface{}, event *beat.Event) error { var arr []interface{} + // get the existing value of target field targetVal, err := event.GetValue(target) if err != nil { - f.logger.Debugf("could not fetch value for key: %s. all the values will be appended in a new key %s.", target, target) + f.logger.Debugf("could not fetch value for key: '%s'. Therefore, all the values will be appended in a new key %s.", target, target) } else { targetArr, ok := targetVal.([]interface{}) if ok { @@ -109,8 +110,8 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] } } + // append the values of all the fields listed under 'fields' section for _, field := range fields { - val, err := event.GetValue(field) if err != nil { if f.config.IgnoreMissing && err.Error() == "key not found" { @@ -118,7 +119,6 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] } return fmt.Errorf("could not fetch value for key: %s, Error: %w", field, err) } - valArr, ok := val.([]interface{}) if ok { arr = append(arr, valArr...) @@ -127,6 +127,7 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] } } + // append all the static values from 'values' section arr = append(arr, values...) // remove empty strings and nil from the array @@ -134,13 +135,19 @@ func (f *appendProcessor) appendValues(target string, fields []string, values [] arr = cleanEmptyValues(arr) } + // remove duplicate values from the array + if !f.config.AllowDuplicate { + arr = removeDuplicates(arr) + } + + // replace the existing target with new array if err := event.Delete(target); err != nil && !(err.Error() == "key not found") { return fmt.Errorf("unable to delete the target field %s due to error: %w", target, err) } - if _, err := event.PutValue(target, arr); err != nil { return fmt.Errorf("unable to put values in the target field %s due to error: %w", target, err) } + return nil } @@ -148,6 +155,7 @@ func (f *appendProcessor) String() string { return "append_processor=" + fmt.Sprintf("%+v", f.config.TargetField) } +// this function will remove all the empty strings and nil values from the array func cleanEmptyValues(dirtyArr []interface{}) (cleanArr []interface{}) { for _, val := range dirtyArr { if val == "" || val == nil { @@ -157,3 +165,15 @@ func cleanEmptyValues(dirtyArr []interface{}) (cleanArr []interface{}) { } return cleanArr } + +// this function will remove all the duplicate values from the array +func removeDuplicates(dirtyArr []interface{}) (cleanArr []interface{}) { + set := make(map[interface{}]bool, 0) + for _, val := range dirtyArr { + if _, ok := set[val]; !ok { + set[val] = true + cleanArr = append(cleanArr, val) + } + } + return cleanArr +} diff --git a/libbeat/processors/actions/append_test.go b/libbeat/processors/actions/append_test.go index ac2a93ffe40..243fb310498 100644 --- a/libbeat/processors/actions/append_test.go +++ b/libbeat/processors/actions/append_test.go @@ -384,3 +384,70 @@ func Test_appendProcessor_Run(t *testing.T) { }) } } + +func Test_removeDuplicates(t *testing.T) { + type args struct { + dirtyArr []interface{} + } + tests := []struct { + description string + args args + wantCleanArr []interface{} + }{ + { + description: "clean up integer array with duplicate values", + args: args{ + dirtyArr: []interface{}{1, 1, 4, 2, 3, 3, 3, 2, 3, 3, 4, 5}, + }, + wantCleanArr: []interface{}{1, 4, 2, 3, 5}, + }, + { + description: "clean up string array with duplicate values", + args: args{ + dirtyArr: []interface{}{"a", "b", "test", "a", "b"}, + }, + wantCleanArr: []interface{}{"a", "b", "test"}, + }, + { + description: "clean up string array without duplicate values", + args: args{ + dirtyArr: []interface{}{"a", "b", "test", "c", "d"}, + }, + wantCleanArr: []interface{}{"a", "b", "test", "c", "d"}, + }, + { + description: "clean up integer array without duplicate values", + args: args{ + dirtyArr: []interface{}{1, 2, 3, 4, 5}, + }, + wantCleanArr: []interface{}{1, 2, 3, 4, 5}, + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + gotCleanArr := removeDuplicates(tt.args.dirtyArr) + isError := false + temp := make(map[interface{}]bool, 0) + for _, val := range gotCleanArr { + temp[val] = true + } + + if len(temp) != len(tt.wantCleanArr) { + isError = true + } + + if !isError { + for _, val := range tt.wantCleanArr { + if _, ok := temp[val]; !ok { + isError = true + break + } + } + } + + if isError { + t.Errorf("removeDuplicates() = %v, want %v", gotCleanArr, tt.wantCleanArr) + } + }) + } +} diff --git a/libbeat/processors/actions/docs/append.asciidoc b/libbeat/processors/actions/docs/append.asciidoc index 982006345df..708d884b528 100644 --- a/libbeat/processors/actions/docs/append.asciidoc +++ b/libbeat/processors/actions/docs/append.asciidoc @@ -14,6 +14,7 @@ However, if the value is an array, all the elements of the array are pushed indi `ignore_empty_values`:: (Optional) If set to `true`, all the `""` and `nil` are omitted from being appended to the target field. `fail_on_error`:: (Optional) If set to `true` and an error occurs, the changes are reverted and the original is returned. If set to `false`, processing continues if an error occurs. Default is `true`. +`allow_duplicate`:: (Optional) If set to `false`, the processor does not append values already present in the field. The default is `true`, which will append duplicate values in the array. `ignore_missing`:: (Optional) Indicates whether to ignore events that lack the source field. The default is `false`, which will fail processing of an event if a field is missing. From 2f6c29652677599509ed1fa727823254f67cc254 Mon Sep 17 00:00:00 2001 From: vinit-chauhan Date: Mon, 14 Nov 2022 09:01:56 +0530 Subject: [PATCH 9/9] Added entry in CHANGELOG.next --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 56c028f1dd4..d26b2ce264a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Beats will now attempt to recover if a lockfile has not been removed {pull}[33169] - Add `http.pprof` config options for enabling block and mutex profiling. {issue}33572[33572] {pull}33576[33576] +- Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364] *Auditbeat*