Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement][libbeat] Add append processor #33364

Merged
merged 15 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

- Beats will now attempt to recover if a lockfile has not been removed {pull}[33169]
- Add `http.pprof` config options for enabling block and mutex profiling. {issue}33572[33572] {pull}33576[33576]
- Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364]

*Auditbeat*

Expand Down
179 changes: 179 additions & 0 deletions libbeat/processors/actions/append.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

type appendProcessor struct {
config appendProcessorConfig
logger *logp.Logger
}

type appendProcessorConfig struct {
Fields []string `config:"fields"`
TargetField string `config:"target_field"`
Values []interface{} `config:"values"`
IgnoreMissing bool `config:"ignore_missing"`
IgnoreEmptyValues bool `config:"ignore_empty_values"`
FailOnError bool `config:"fail_on_error"`
AllowDuplicate bool `config:"allow_duplicate"` //TODO: Add functionality to remove duplicate
}

func init() {
processors.RegisterPlugin("append_processor",
checks.ConfigChecked(NewAppendProcessor,
checks.RequireFields("target_field"),
),
)
jsprocessor.RegisterPlugin("AppendProcessor", NewAppendProcessor)
}

// NewAppendProcessor returns a new append_processor processor.
func NewAppendProcessor(c *conf.C) (processors.Processor, error) {
config := appendProcessorConfig{
IgnoreMissing: false,
IgnoreEmptyValues: false,
FailOnError: true,
AllowDuplicate: true,
}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("failed to unpack the configuration of append processor: %w", err)
}

f := &appendProcessor{
config: config,
logger: logp.NewLogger("append_processor"),
}
return f, nil
}

func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) {
var backup *beat.Event
if f.config.FailOnError {
backup = event.Clone()
}

err := f.appendValues(f.config.TargetField, f.config.Fields, f.config.Values, event)
if err != nil {
errMsg := fmt.Errorf("failed to append fields in append_processor processor: %w", err)
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
event = backup
if _, err := event.PutValue("error.message", errMsg.Error()); err != nil {
return nil, fmt.Errorf("failed to append fields in append_processor processor: %w", err)
}
return event, err
}
}

return event, nil
}

func (f *appendProcessor) appendValues(target string, fields []string, values []interface{}, event *beat.Event) error {
var arr []interface{}

// get the existing value of target field
targetVal, err := event.GetValue(target)
if err != nil {
f.logger.Debugf("could not fetch value for key: '%s'. Therefore, all the values will be appended in a new key %s.", target, target)
} else {
targetArr, ok := targetVal.([]interface{})
if ok {
arr = append(arr, targetArr...)
} else {
arr = append(arr, targetVal)
}
}

// append the values of all the fields listed under 'fields' section
for _, field := range fields {
val, err := event.GetValue(field)
if err != nil {
if f.config.IgnoreMissing && err.Error() == "key not found" {
continue
}
return fmt.Errorf("could not fetch value for key: %s, Error: %w", field, err)
}
valArr, ok := val.([]interface{})
if ok {
arr = append(arr, valArr...)
} else {
arr = append(arr, val)
}
}

// append all the static values from 'values' section
arr = append(arr, values...)

// remove empty strings and nil from the array
if f.config.IgnoreEmptyValues {
arr = cleanEmptyValues(arr)
}

// remove duplicate values from the array
if !f.config.AllowDuplicate {
arr = removeDuplicates(arr)
}

// replace the existing target with new array
if err := event.Delete(target); err != nil && !(err.Error() == "key not found") {
return fmt.Errorf("unable to delete the target field %s due to error: %w", target, err)
}
if _, err := event.PutValue(target, arr); err != nil {
return fmt.Errorf("unable to put values in the target field %s due to error: %w", target, err)
}

return nil
}

func (f *appendProcessor) String() string {
return "append_processor=" + fmt.Sprintf("%+v", f.config.TargetField)
}

// this function will remove all the empty strings and nil values from the array
func cleanEmptyValues(dirtyArr []interface{}) (cleanArr []interface{}) {
for _, val := range dirtyArr {
if val == "" || val == nil {
continue
}
cleanArr = append(cleanArr, val)
}
return cleanArr
}

// this function will remove all the duplicate values from the array
func removeDuplicates(dirtyArr []interface{}) (cleanArr []interface{}) {
set := make(map[interface{}]bool, 0)
for _, val := range dirtyArr {
if _, ok := set[val]; !ok {
set[val] = true
cleanArr = append(cleanArr, val)
}
}
return cleanArr
}
Loading