Skip to content

Commit

Permalink
Make set processor with copy_from compatible with ES < 7.13
Browse files Browse the repository at this point in the history
Automatically rewrite Ingest Node `set` processors that use `copy_from` when connected to
Elasticsearch versions less than 7.13. The copy_from is replaced with `{{{$copy_from value}}}`.
The triple brace ensures the behavior is the same as copy_from in that no escaping is added.
  • Loading branch information
andrewkroh committed Jun 29, 2021
1 parent 4accfa8 commit 2b94416
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix bug in `httpjson` that prevented `first_event` getting updated. {pull}26407[26407]
- Fix bug in the Syslog input that misparsed rfc5424 days starting with 0. {pull}26419[26419]
- Do not close filestream harvester if an unexpected error is returned when close.on_state_change.* is enabled. {pull}26411[26411]
- Fix module pipeline compatability for Elasticsearch versions less than 7.13.0 in modules that use Ingest Node `set` processors with `copy_from`. {pull}26593[26593]

*Filebeat*

Expand Down
23 changes: 23 additions & 0 deletions filebeat/fileset/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ var processorCompatibilityChecks = []processorCompatibility{
},
adaptConfig: deleteProcessor,
},
{
procType: "set",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: replaceSetCopyFromValue,
},
{
procType: "set",
checkVersion: func(esVersion *common.Version) bool {
Expand Down Expand Up @@ -174,6 +181,22 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger)
return false, nil
}

// replaceSetCopyFromValue replaces copy_from option with a 'value'.
func replaceSetCopyFromValue(config map[string]interface{}, log *logp.Logger) (bool, error) {
copyFrom, ok := config["copy_from"].(string)
if !ok {
return false, nil
}

log.Debug("Removing unsupported 'copy_from' from set processor.")
delete(config, "copy_from")

value := "{{{" + copyFrom + "}}}"
log.Debugf("Adding 'value: %q' to replace 'copy_from' in set processor.", value)
config["value"] = value
return false, nil
}

// replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement
// so ES less than 7.10 will work.
func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) (bool, error) {
Expand Down
63 changes: 63 additions & 0 deletions filebeat/fileset/compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,69 @@ func TestReplaceSetIgnoreEmptyValue(t *testing.T) {
}
}

func TestReplaceSetCopyFromValue(t *testing.T) {
setWithCopyFrom := map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"copy_from": "panw.panos.ruleset",
},
},
}}

cases := []struct {
name string
esVersion *common.Version
content map[string]interface{}
expected map[string]interface{}
isErrExpected bool
}{
{
name: "ES < 7.13.0",
esVersion: common.MustNewVersion("7.12.2"),
content: setWithCopyFrom,
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{{panw.panos.ruleset}}}",
},
},
},
},
isErrExpected: false,
},
{
name: "ES == 7.13.0",
esVersion: common.MustNewVersion("7.13.0"),
content: setWithCopyFrom,
expected: setWithCopyFrom,
},
{
name: "ES > 7.13.0",
esVersion: common.MustNewVersion("8.0.0"),
content: setWithCopyFrom,
expected: setWithCopyFrom,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, test.expected, test.content, test.name)
}
})
}
}

func TestReplaceAppendAllowDuplicates(t *testing.T) {
cases := []struct {
name string
Expand Down

0 comments on commit 2b94416

Please sign in to comment.