diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f309326613d..8f0ca7f6edd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -256,7 +256,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add output test to kafka output {pull}10834[10834] - Add ip fields to default_field in Elasticsearch template. {pull}11035[11035] - Gracefully shut down on SIGHUP {pull}10704[10704] -- Add `script` processor that supports using Javascript to process events. {pull}10850[10850] {pull}11260[11260] +- New processor: `copy_fields`. {pull}11303[11303] +- Add `error.message` to events when `fail_on_error` is set in `rename` and `copy_fields` processors. {pull}11303[11303] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 62412f67731..a67509d1efe 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -329,6 +329,16 @@ auditbeat.modules: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index a660eb366b8..352e575eeec 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1040,6 +1040,16 @@ filebeat.inputs: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index a7c7be6677f..e5317c15121 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -473,6 +473,16 @@ heartbeat.scheduler: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index f9ad8dab8d2..f48abb827a9 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -269,6 +269,16 @@ setup.template.settings: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index 6491383ac5c..7224aa9e5f5 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -217,6 +217,16 @@ # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/libbeat/processors/actions/copy_fields.go b/libbeat/processors/actions/copy_fields.go new file mode 100644 index 00000000000..95f39d5137b --- /dev/null +++ b/libbeat/processors/actions/copy_fields.go @@ -0,0 +1,108 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" +) + +type copyFields struct { + config copyFieldsConfig +} + +type copyFieldsConfig struct { + Fields []fromTo `config:"fields"` + IgnoreMissing bool `config:"ignore_missing"` + FailOnError bool `config:"fail_on_error"` +} + +func init() { + processors.RegisterPlugin("copy_fields", + configChecked(newCopyFields, + requireFields("fields"), + ), + ) +} + +func newCopyFields(c *common.Config) (processors.Processor, error) { + config := copyFieldsConfig{ + IgnoreMissing: false, + FailOnError: true, + } + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the configuration of copy processor: %s", err) + } + + f := ©Fields{ + config: config, + } + return f, nil +} + +func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) { + var backup common.MapStr + if f.config.FailOnError { + backup = event.Fields.Clone() + } + + for _, field := range f.config.Fields { + err := f.copyField(field.From, field.To, event.Fields) + if err != nil && f.config.FailOnError { + errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err) + logp.Debug("copy_fields", errMsg.Error()) + event.Fields = backup + event.PutValue("error.message", errMsg.Error()) + return event, err + } + } + + return event, nil +} + +func (f *copyFields) copyField(from string, to string, fields common.MapStr) error { + exists, _ := fields.HasKey(to) + if exists { + return fmt.Errorf("target field %s already exists, drop or rename this field first", to) + } + + value, err := fields.GetValue(from) + if err != nil { + if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return nil + } + return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err) + } + + _, err = fields.Put(to, value) + if err != nil { + return fmt.Errorf("could not copy value to %s: %v, %+v", to, value, err) + } + return nil +} + +func (f *copyFields) String() string { + return "copy_fields=" + fmt.Sprintf("%+v", f.config.Fields) +} diff --git a/libbeat/processors/actions/copy_fields_test.go b/libbeat/processors/actions/copy_fields_test.go new file mode 100644 index 00000000000..a3de4ae1947 --- /dev/null +++ b/libbeat/processors/actions/copy_fields_test.go @@ -0,0 +1,145 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestCopyFields(t *testing.T) { + + var tests = map[string]struct { + FromTo fromTo + Input common.MapStr + Expected common.MapStr + }{ + "copy string from message to message_copied": { + FromTo: fromTo{ + From: "message", + To: "message_copied", + }, + Input: common.MapStr{ + "message": "please copy this line", + }, + Expected: common.MapStr{ + "message": "please copy this line", + "message_copied": "please copy this line", + }, + }, + "copy string from nested key nested.message to top level field message_copied": { + FromTo: fromTo{ + From: "nested.message", + To: "message_copied", + }, + Input: common.MapStr{ + "nested": common.MapStr{ + "message": "please copy this line", + }, + }, + Expected: common.MapStr{ + "nested": common.MapStr{ + "message": "please copy this line", + }, + "message_copied": "please copy this line", + }, + }, + "copy string from fieldname with dot to message_copied": { + FromTo: fromTo{ + From: "dotted.message", + To: "message_copied", + }, + Input: common.MapStr{ + "dotted.message": "please copy this line", + }, + Expected: common.MapStr{ + "dotted.message": "please copy this line", + "message_copied": "please copy this line", + }, + }, + "copy number from fieldname with dot to dotted message.copied": { + FromTo: fromTo{ + From: "message.original", + To: "message.copied", + }, + Input: common.MapStr{ + "message.original": 42, + }, + Expected: common.MapStr{ + "message.original": 42, + "message": common.MapStr{ + "copied": 42, + }, + }, + }, + "copy number from hierarchical message.original to top level message which fails": { + FromTo: fromTo{ + From: "message.original", + To: "message", + }, + Input: common.MapStr{ + "message": common.MapStr{ + "original": 42, + }, + }, + Expected: common.MapStr{ + "message": common.MapStr{ + "original": 42, + }, + }, + }, + "copy number from hierarchical message.original to top level message": { + FromTo: fromTo{ + From: "message.original", + To: "message", + }, + Input: common.MapStr{ + "message.original": 42, + }, + Expected: common.MapStr{ + "message.original": 42, + "message": 42, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + p := copyFields{ + copyFieldsConfig{ + Fields: []fromTo{ + test.FromTo, + }, + }, + } + + event := &beat.Event{ + Fields: test.Input, + } + + newEvent, err := p.Run(event) + assert.NoError(t, err) + + assert.Equal(t, test.Expected, newEvent.Fields) + }) + } +} diff --git a/libbeat/processors/actions/rename.go b/libbeat/processors/actions/rename.go index bd39f93ab74..7eded9bf23f 100644 --- a/libbeat/processors/actions/rename.go +++ b/libbeat/processors/actions/rename.go @@ -75,8 +75,10 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) { for _, field := range f.config.Fields { err := f.renameField(field.From, field.To, event.Fields) if err != nil && f.config.FailOnError { - logp.Debug("rename", "Failed to rename fields, revert to old event: %s", err) + errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err) + logp.Debug("rename", errMsg.Error()) event.Fields = backup + event.PutValue("error.message", errMsg.Error()) return event, err } } @@ -108,7 +110,7 @@ func (f *renameFields) renameField(from string, to string, fields common.MapStr) _, err = fields.Put(to, value) if err != nil { - return fmt.Errorf("could not put value: %s: %v, %+v", to, value, err) + return fmt.Errorf("could not put value: %s: %v, %v", to, value, err) } return nil } diff --git a/libbeat/processors/actions/rename_test.go b/libbeat/processors/actions/rename_test.go index cfdce8a3595..125986e8a15 100644 --- a/libbeat/processors/actions/rename_test.go +++ b/libbeat/processors/actions/rename_test.go @@ -92,6 +92,9 @@ func TestRenameRun(t *testing.T) { Output: common.MapStr{ "a": 2, "b": "q", + "error": common.MapStr{ + "message": "Failed to rename fields in processor: target field b already exists, drop or rename this field first", + }, }, error: true, FailOnError: true, @@ -188,6 +191,9 @@ func TestRenameRun(t *testing.T) { Output: common.MapStr{ "a": 9, "c": 10, + "error": common.MapStr{ + "message": "Failed to rename fields in processor: could not put value: a.c: 10, expected map but type is int", + }, }, error: true, IgnoreMissing: false, diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 97dfa32d718..bc30e786090 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -943,6 +943,16 @@ metricbeat.modules: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 30a3e7ebb0d..63263147f22 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -697,6 +697,16 @@ packetbeat.ignore_outgoing: false # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 3e2f4b3f2e3..fb8853f30cb 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -246,6 +246,16 @@ winlogbeat.event_logs: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index 06409690e8a..60c5fa02fa9 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -364,6 +364,16 @@ auditbeat.modules: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 1d92c6bd61a..dc315b4f67c 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1119,6 +1119,16 @@ filebeat.inputs: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 6aa338385a5..67679ba65df 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -362,6 +362,16 @@ functionbeat.provider.aws.functions: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ================================== diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index f2e684c4981..8d72144103c 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -972,6 +972,16 @@ metricbeat.modules: # max_depth: 1 # target: "" # overwrite_keys: false +# +# The following example copies the value of message to message_copied +# +#processors: +#- copy_fields: +# fields: +# - from: message +# to: message_copied +# fail_on_error: true +# ignore_missing: false #============================= Elastic Cloud ==================================