diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 854f8740435..a3afe607328 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -115,6 +115,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Beats will now attempt to recover if a lockfile has not been removed {pull}[33169] - Add `http.pprof` config options for enabling block and mutex profiling. {issue}33572[33572] {pull}33576[33576] +- Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364] *Auditbeat* diff --git a/libbeat/processors/actions/append.go b/libbeat/processors/actions/append.go new file mode 100644 index 00000000000..bce42c1e6ba --- /dev/null +++ b/libbeat/processors/actions/append.go @@ -0,0 +1,179 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/checks" + jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +type appendProcessor struct { + config appendProcessorConfig + logger *logp.Logger +} + +type appendProcessorConfig struct { + Fields []string `config:"fields"` + TargetField string `config:"target_field"` + Values []interface{} `config:"values"` + IgnoreMissing bool `config:"ignore_missing"` + IgnoreEmptyValues bool `config:"ignore_empty_values"` + FailOnError bool `config:"fail_on_error"` + AllowDuplicate bool `config:"allow_duplicate"` //TODO: Add functionality to remove duplicate +} + +func init() { + processors.RegisterPlugin("append_processor", + checks.ConfigChecked(NewAppendProcessor, + checks.RequireFields("target_field"), + ), + ) + jsprocessor.RegisterPlugin("AppendProcessor", NewAppendProcessor) +} + +// NewAppendProcessor returns a new append_processor processor. +func NewAppendProcessor(c *conf.C) (processors.Processor, error) { + config := appendProcessorConfig{ + IgnoreMissing: false, + IgnoreEmptyValues: false, + FailOnError: true, + AllowDuplicate: true, + } + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the configuration of append processor: %w", err) + } + + f := &appendProcessor{ + config: config, + logger: logp.NewLogger("append_processor"), + } + return f, nil +} + +func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) { + var backup *beat.Event + if f.config.FailOnError { + backup = event.Clone() + } + + err := f.appendValues(f.config.TargetField, f.config.Fields, f.config.Values, event) + if err != nil { + errMsg := fmt.Errorf("failed to append fields in append_processor processor: %w", err) + f.logger.Debug(errMsg.Error()) + if f.config.FailOnError { + event = backup + if _, err := event.PutValue("error.message", errMsg.Error()); err != nil { + return nil, fmt.Errorf("failed to append fields in append_processor processor: %w", err) + } + return event, err + } + } + + return event, nil +} + +func (f *appendProcessor) appendValues(target string, fields []string, values []interface{}, event *beat.Event) error { + var arr []interface{} + + // get the existing value of target field + targetVal, err := event.GetValue(target) + if err != nil { + f.logger.Debugf("could not fetch value for key: '%s'. Therefore, all the values will be appended in a new key %s.", target, target) + } else { + targetArr, ok := targetVal.([]interface{}) + if ok { + arr = append(arr, targetArr...) + } else { + arr = append(arr, targetVal) + } + } + + // append the values of all the fields listed under 'fields' section + for _, field := range fields { + val, err := event.GetValue(field) + if err != nil { + if f.config.IgnoreMissing && err.Error() == "key not found" { + continue + } + return fmt.Errorf("could not fetch value for key: %s, Error: %w", field, err) + } + valArr, ok := val.([]interface{}) + if ok { + arr = append(arr, valArr...) + } else { + arr = append(arr, val) + } + } + + // append all the static values from 'values' section + arr = append(arr, values...) + + // remove empty strings and nil from the array + if f.config.IgnoreEmptyValues { + arr = cleanEmptyValues(arr) + } + + // remove duplicate values from the array + if !f.config.AllowDuplicate { + arr = removeDuplicates(arr) + } + + // replace the existing target with new array + if err := event.Delete(target); err != nil && !(err.Error() == "key not found") { + return fmt.Errorf("unable to delete the target field %s due to error: %w", target, err) + } + if _, err := event.PutValue(target, arr); err != nil { + return fmt.Errorf("unable to put values in the target field %s due to error: %w", target, err) + } + + return nil +} + +func (f *appendProcessor) String() string { + return "append_processor=" + fmt.Sprintf("%+v", f.config.TargetField) +} + +// this function will remove all the empty strings and nil values from the array +func cleanEmptyValues(dirtyArr []interface{}) (cleanArr []interface{}) { + for _, val := range dirtyArr { + if val == "" || val == nil { + continue + } + cleanArr = append(cleanArr, val) + } + return cleanArr +} + +// this function will remove all the duplicate values from the array +func removeDuplicates(dirtyArr []interface{}) (cleanArr []interface{}) { + set := make(map[interface{}]bool, 0) + for _, val := range dirtyArr { + if _, ok := set[val]; !ok { + set[val] = true + cleanArr = append(cleanArr, val) + } + } + return cleanArr +} diff --git a/libbeat/processors/actions/append_test.go b/libbeat/processors/actions/append_test.go new file mode 100644 index 00000000000..243fb310498 --- /dev/null +++ b/libbeat/processors/actions/append_test.go @@ -0,0 +1,453 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "reflect" + "testing" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +var log = logp.NewLogger("append_test") + +func Test_cleanEmptyValues(t *testing.T) { + type args struct { + dirtyArr []interface{} + } + tests := []struct { + description string + args args + wantCleanArr []interface{} + }{ + { + description: "array with empty values", + args: args{ + dirtyArr: []interface{}{"asdf", "", 12, "", nil}, + }, + wantCleanArr: []interface{}{"asdf", 12}, + }, + { + description: "array with no empty values", + args: args{ + dirtyArr: []interface{}{"asdf", "asd", 12, 123}, + }, + wantCleanArr: []interface{}{"asdf", "asd", 12, 123}, + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + if gotCleanArr := cleanEmptyValues(tt.args.dirtyArr); !reflect.DeepEqual(gotCleanArr, tt.wantCleanArr) { + t.Errorf("cleanEmptyValues() = %v, want %v", gotCleanArr, tt.wantCleanArr) + } + }) + } +} + +func Test_appendProcessor_appendValues(t *testing.T) { + type fields struct { + config appendProcessorConfig + logger *logp.Logger + } + type args struct { + target string + fields []string + values []interface{} + event *beat.Event + } + tests := []struct { + description string + fields fields + args args + wantErr bool + }{ + { + description: "append value in the arrays from a field when target_field is not present", + args: args{ + target: "target", + fields: []string{"field"}, + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "value", + }, + }, + }, + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"field"}, + TargetField: "target", + }, + }, + wantErr: false, + }, + { + description: "append value in the arrays from an unknown field", + args: args{ + target: "target", + fields: []string{"some-field"}, + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "value", + }, + }, + }, + fields: fields{ + logger: log, + config: appendProcessorConfig{ + IgnoreEmptyValues: false, + IgnoreMissing: false, + AllowDuplicate: true, + FailOnError: true, + }, + }, + wantErr: true, + }, + { + description: "append value in the arrays from an unknown field with 'ignore_missing: true'", + args: args{ + target: "target", + fields: []string{"some-field"}, + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "value", + }, + }, + }, + fields: fields{ + logger: log, + config: appendProcessorConfig{ + IgnoreEmptyValues: false, + IgnoreMissing: true, + AllowDuplicate: true, + FailOnError: true, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + f := &appendProcessor{ + config: tt.fields.config, + logger: tt.fields.logger, + } + if err := f.appendValues(tt.args.target, tt.args.fields, tt.args.values, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("appendProcessor.appendValues() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_appendProcessor_Run(t *testing.T) { + type fields struct { + config appendProcessorConfig + logger *logp.Logger + } + type args struct { + event *beat.Event + } + tests := []struct { + description string + fields fields + args args + want *beat.Event + wantErr bool + }{ + { + description: "positive flow", + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"array-one", "array-two", "concrete-field"}, + TargetField: "target", + Values: []interface{}{"value1", "value2"}, + IgnoreMissing: false, + IgnoreEmptyValues: false, + FailOnError: true, + AllowDuplicate: true, + }, + }, + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "some-value", + "array-one": []interface{}{"one", "", "two", "three"}, + "array-two": []interface{}{"four", "five", ""}, + }, + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "some-value", + "array-one": []interface{}{"one", "", "two", "three"}, + "array-two": []interface{}{"four", "five", ""}, + "target": []interface{}{"one", "", "two", "three", "four", "five", "", "some-value", "value1", "value2"}, + }, + }, + }, + { + description: "append value in the arrays from a field when target_field is present and it is a scaler", + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "target": "scaler-value", + "field": "I'm being appended", + }, + }, + }, + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"field"}, + TargetField: "target", + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "I'm being appended", + "target": []interface{}{"scaler-value", "I'm being appended"}, + }, + }, + }, + { + description: "append value in the arrays from a field when target_field is present and it is an array", + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "target": []interface{}{"value1", "value2"}, + "field": "I'm being appended", + }, + }, + }, + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"field"}, + TargetField: "target", + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "field": "I'm being appended", + "target": []interface{}{"value1", "value2", "I'm being appended"}, + }, + }, + }, + { + description: "test for nested field", + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"array.one", "array.two", "concrete-field"}, + TargetField: "target", + Values: []interface{}{"value1", "value2"}, + IgnoreMissing: false, + IgnoreEmptyValues: false, + FailOnError: true, + AllowDuplicate: true, + }, + }, + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "some-value", + "array": mapstr.M{ + "one": []interface{}{"one", "", "two", "three"}, + "two": []interface{}{"four", "five", ""}, + }, + }, + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "some-value", + "array": mapstr.M{ + "one": []interface{}{"one", "", "two", "three"}, + "two": []interface{}{"four", "five", ""}, + }, + "target": []interface{}{"one", "", "two", "three", "four", "five", "", "some-value", "value1", "value2"}, + }, + }, + }, + { + description: "remove empty values form output - 'ignore_empty_values: true'", + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"array-one", "array-two", "concrete-field"}, + TargetField: "target", + Values: []interface{}{"value1", nil, "value2", "", nil}, + IgnoreMissing: false, + IgnoreEmptyValues: true, + FailOnError: true, + AllowDuplicate: true, + }, + }, + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "", + "array-one": []interface{}{"one", "", "two", "three"}, + "array-two": []interface{}{"four", "five", ""}, + }, + }, + }, + wantErr: false, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "concrete-field": "", + "array-one": []interface{}{"one", "", "two", "three"}, + "array-two": []interface{}{"four", "five", ""}, + "target": []interface{}{"one", "two", "three", "four", "five", "value1", "value2"}, + }, + }, + }, + { + description: "append value of a missing field with 'ignore_missing: false'", + fields: fields{ + logger: log, + config: appendProcessorConfig{ + Fields: []string{"missing-field"}, + TargetField: "target", + IgnoreMissing: false, + IgnoreEmptyValues: false, + FailOnError: true, + AllowDuplicate: true, + }, + }, + args: args{ + event: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{}, + }, + }, + wantErr: true, + want: &beat.Event{ + Meta: mapstr.M{}, + Fields: mapstr.M{ + "error": mapstr.M{ + "message": "failed to append fields in append_processor processor: could not fetch value for key: missing-field, Error: key not found", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + f := &appendProcessor{ + config: tt.fields.config, + logger: tt.fields.logger, + } + got, err := f.Run(tt.args.event) + if (err != nil) != tt.wantErr { + t.Errorf("appendProcessor.Run() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("appendProcessor.Run() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_removeDuplicates(t *testing.T) { + type args struct { + dirtyArr []interface{} + } + tests := []struct { + description string + args args + wantCleanArr []interface{} + }{ + { + description: "clean up integer array with duplicate values", + args: args{ + dirtyArr: []interface{}{1, 1, 4, 2, 3, 3, 3, 2, 3, 3, 4, 5}, + }, + wantCleanArr: []interface{}{1, 4, 2, 3, 5}, + }, + { + description: "clean up string array with duplicate values", + args: args{ + dirtyArr: []interface{}{"a", "b", "test", "a", "b"}, + }, + wantCleanArr: []interface{}{"a", "b", "test"}, + }, + { + description: "clean up string array without duplicate values", + args: args{ + dirtyArr: []interface{}{"a", "b", "test", "c", "d"}, + }, + wantCleanArr: []interface{}{"a", "b", "test", "c", "d"}, + }, + { + description: "clean up integer array without duplicate values", + args: args{ + dirtyArr: []interface{}{1, 2, 3, 4, 5}, + }, + wantCleanArr: []interface{}{1, 2, 3, 4, 5}, + }, + } + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + gotCleanArr := removeDuplicates(tt.args.dirtyArr) + isError := false + temp := make(map[interface{}]bool, 0) + for _, val := range gotCleanArr { + temp[val] = true + } + + if len(temp) != len(tt.wantCleanArr) { + isError = true + } + + if !isError { + for _, val := range tt.wantCleanArr { + if _, ok := temp[val]; !ok { + isError = true + break + } + } + } + + if isError { + t.Errorf("removeDuplicates() = %v, want %v", gotCleanArr, tt.wantCleanArr) + } + }) + } +} diff --git a/libbeat/processors/actions/docs/append.asciidoc b/libbeat/processors/actions/docs/append.asciidoc new file mode 100644 index 00000000000..708d884b528 --- /dev/null +++ b/libbeat/processors/actions/docs/append.asciidoc @@ -0,0 +1,55 @@ +[[append]] +=== Append Processor + +++++ +append_processor +++++ + +The `append_processor` processor appends one or more values to an existing array if the target field already exists and it is an array. COnverts a scaler to an array and appends one or more values to it if the field exists and it is a scaler. Here the values can either be one or more static values or one or more values from the fields listed under 'fields' key. + +`target_field`:: The field in which you want to append the data. +`fields`:: () List of fields from which you want to copy data from. If the value is of a concrete type it will be appended directly to the target. +However, if the value is an array, all the elements of the array are pushed individually to the target field. +`values`:: (Optional) List of static values you want to append to target field. +`ignore_empty_values`:: (Optional) If set to `true`, all the `""` and `nil` are omitted from being appended to the target field. +`fail_on_error`:: (Optional) If set to `true` and an error occurs, the changes are reverted and the original is returned. If set to `false`, +processing continues if an error occurs. Default is `true`. +`allow_duplicate`:: (Optional) If set to `false`, the processor does not append values already present in the field. The default is `true`, which will append duplicate values in the array. +`ignore_missing`:: (Optional) Indicates whether to ignore events that lack the source + field. The default is `false`, which will fail processing of + an event if a field is missing. + +note: If you want to use `fields` parameter with fields under `message`, make sure you use `decode_json_fields` first with `target: ""`. + +For example, this configuration: + +[source,yaml] +------------------------------------------------------------------------------ +processors: + - decode_json_fields: + fields: message + target: "" + - append_processor: + target_field: my-target-field + fields: + - concrete.field + - array.one + values: + - static-value + - "" + ignore_missing: true + fail_on_error: true + ignore_empty_values: true +------------------------------------------------------------------------------ + +Copies the values of `concrete.field`, `array.one` from response field to `my-target-field`: + +[source,json] +------------------------------------------------------------------------------- +{ + "message": "my-interesting-message", + "event": { + "original": "my-interesting-message" + } +} +-------------------------------------------------------------------------------