Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New processor: copy_fields #11303

Merged
merged 13 commits into from
Mar 22, 2019
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add output test to kafka output {pull}10834[10834]
- Add ip fields to default_field in Elasticsearch template. {pull}11035[11035]
- Gracefully shut down on SIGHUP {pull}10704[10704]
- Add `script` processor that supports using Javascript to process events. {pull}10850[10850] {pull}11260[11260]
- New processor: `copy_fields`. {pull}11303[11303]
- Add `error.message` to events when `fail_on_error` is set in `rename` and `copy_fields` processors. {pull}11303[11303]

*Auditbeat*

Expand Down
10 changes: 10 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ auditbeat.modules:
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
10 changes: 10 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,16 @@ filebeat.inputs:
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
10 changes: 10 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,16 @@ heartbeat.scheduler:
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
10 changes: 10 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ setup.template.settings:
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
10 changes: 10 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
108 changes: 108 additions & 0 deletions libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
)

type copyFields struct {
config copyFieldsConfig
}

type copyFieldsConfig struct {
Fields []fromTo `config:"fields"`
IgnoreMissing bool `config:"ignore_missing"`
FailOnError bool `config:"fail_on_error"`
}

func init() {
processors.RegisterPlugin("copy_fields",
configChecked(newCopyFields,
requireFields("fields"),
),
)
}

func newCopyFields(c *common.Config) (processors.Processor, error) {
config := copyFieldsConfig{
IgnoreMissing: false,
FailOnError: true,
}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("failed to unpack the configuration of copy processor: %s", err)
}

f := &copyFields{
config: config,
}
return f, nil
}

func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
var backup common.MapStr
if f.config.FailOnError {
backup = event.Fields.Clone()
}

for _, field := range f.config.Fields {
err := f.copyField(field.From, field.To, event.Fields)
if err != nil && f.config.FailOnError {
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err)
logp.Debug("copy_fields", errMsg.Error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a selector for logger? I like what @andrewkroh's did in the script processor, see

Tag string `config:"tag"` // Processor ID for debug and metrics.

And

logName = "processor.javascript"

Copy link
Contributor Author

@kvch kvch Mar 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH in case of this simple processor, I don't see the added value of letting a user set a debug selector. I doubt that users would need to filter for a specific copy processor in their logs, or if they have to, I think messages let then identify the processor easily.

If we want to let user set a selector, it would be much better to let them configure it for all processors. There are other processors which would benefit more from having such setting than this one. What if I open an issue and implement the functionality for all processors in a follow up PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am OK with a followup PR.

event.Fields = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
}

return event, nil
}

func (f *copyFields) copyField(from string, to string, fields common.MapStr) error {
exists, _ := fields.HasKey(to)
ruflin marked this conversation as resolved.
Show resolved Hide resolved
if exists {
return fmt.Errorf("target field %s already exists, drop or rename this field first", to)
}

value, err := fields.GetValue(from)
if err != nil {
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return nil
}
return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err)
}

_, err = fields.Put(to, value)
if err != nil {
return fmt.Errorf("could not copy value to %s: %v, %+v", to, value, err)
}
return nil
}

func (f *copyFields) String() string {
return "copy_fields=" + fmt.Sprintf("%+v", f.config.Fields)
}
145 changes: 145 additions & 0 deletions libbeat/processors/actions/copy_fields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestCopyFields(t *testing.T) {

var tests = map[string]struct {
FromTo fromTo
Input common.MapStr
Expected common.MapStr
}{
"copy string from message to message_copied": {
FromTo: fromTo{
From: "message",
To: "message_copied",
},
Input: common.MapStr{
kvch marked this conversation as resolved.
Show resolved Hide resolved
"message": "please copy this line",
},
Expected: common.MapStr{
"message": "please copy this line",
"message_copied": "please copy this line",
},
},
"copy string from nested key nested.message to top level field message_copied": {
FromTo: fromTo{
From: "nested.message",
To: "message_copied",
},
Input: common.MapStr{
"nested": common.MapStr{
"message": "please copy this line",
},
},
Expected: common.MapStr{
"nested": common.MapStr{
"message": "please copy this line",
},
"message_copied": "please copy this line",
},
},
"copy string from fieldname with dot to message_copied": {
FromTo: fromTo{
From: "dotted.message",
To: "message_copied",
},
Input: common.MapStr{
"dotted.message": "please copy this line",
},
Expected: common.MapStr{
"dotted.message": "please copy this line",
"message_copied": "please copy this line",
},
},
"copy number from fieldname with dot to dotted message.copied": {
FromTo: fromTo{
From: "message.original",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you copy message.original to message? Will it fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set message.original as a single key containing a dot, so it does not lead to an error when someone wants to manipulate the submaps under message. It is a different key.

If the test input was different e.g. the key message was already present

Input: common.MapStr{
    "message": common.MapStr{
        "original": 42,
    },
}

It would fail and tell the user to drop the field message first, before attempting to copy stuff.

This is a more realistic test case, so I am adding it also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, good to know. Could you add a test case for both and one of them would fail. The reason I'm asking also for the failing one is because in case we change the behaviour of the above in the future, we will know that it has an impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test with the description "copy number from hierarchical message.original to top level message which fails" will fail if the behaviour changes, as the new field would show up in the output. I have also added a new test case which demonstrates what happens when the key contains a dot and is present as a top level key in the fields. If MapStr is changed one of the tests will fail.

To: "message.copied",
},
Input: common.MapStr{
"message.original": 42,
},
Expected: common.MapStr{
"message.original": 42,
"message": common.MapStr{
"copied": 42,
},
},
},
"copy number from hierarchical message.original to top level message which fails": {
FromTo: fromTo{
From: "message.original",
To: "message",
},
Input: common.MapStr{
"message": common.MapStr{
"original": 42,
},
},
Expected: common.MapStr{
"message": common.MapStr{
"original": 42,
},
},
},
"copy number from hierarchical message.original to top level message": {
FromTo: fromTo{
From: "message.original",
To: "message",
},
Input: common.MapStr{
"message.original": 42,
},
Expected: common.MapStr{
"message.original": 42,
"message": 42,
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
p := copyFields{
copyFieldsConfig{
Fields: []fromTo{
test.FromTo,
},
},
}

event := &beat.Event{
Fields: test.Input,
}

newEvent, err := p.Run(event)
assert.NoError(t, err)

assert.Equal(t, test.Expected, newEvent.Fields)
})
}
}
6 changes: 4 additions & 2 deletions libbeat/processors/actions/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) {
for _, field := range f.config.Fields {
err := f.renameField(field.From, field.To, event.Fields)
if err != nil && f.config.FailOnError {
logp.Debug("rename", "Failed to rename fields, revert to old event: %s", err)
errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err)
logp.Debug("rename", errMsg.Error())
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
}
Expand Down Expand Up @@ -108,7 +110,7 @@ func (f *renameFields) renameField(from string, to string, fields common.MapStr)

_, err = fields.Put(to, value)
if err != nil {
return fmt.Errorf("could not put value: %s: %v, %+v", to, value, err)
return fmt.Errorf("could not put value: %s: %v, %v", to, value, err)
}
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions libbeat/processors/actions/rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func TestRenameRun(t *testing.T) {
Output: common.MapStr{
"a": 2,
"b": "q",
"error": common.MapStr{
"message": "Failed to rename fields in processor: target field b already exists, drop or rename this field first",
},
},
error: true,
FailOnError: true,
Expand Down Expand Up @@ -188,6 +191,9 @@ func TestRenameRun(t *testing.T) {
Output: common.MapStr{
"a": 9,
"c": 10,
"error": common.MapStr{
"message": "Failed to rename fields in processor: could not put value: a.c: 10, expected map but type is int",
},
},
error: true,
IgnoreMissing: false,
Expand Down
Loading