diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index 933500f7cfb..a3760f72894 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- The extension of the log files of Beats and Elastic Agent is changed to `.ndjson`. If you are collecting the logs, you must change the path configuration to `/path/to/logs/{beatname}*.ndjson` to avoid any issues. {pull}28927[28927]
- Remove legacy support for SSLv3. {pull}30071[30071]
- `add_fields` processor is now able to set metadata in events {pull}30092[30092]
+- Add metadata change support for some processors {pull}30183[30183]
*Auditbeat*
diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go
index 7fd016a14dc..bc8ffe6c3e9 100644
--- a/libbeat/beat/event.go
+++ b/libbeat/beat/event.go
@@ -71,6 +71,17 @@ func (e *Event) GetValue(key string) (interface{}, error) {
return e.Fields.GetValue(key)
}
+// Clone creates an exact copy of the event
+func (e *Event) Clone() *Event {
+ return &Event{
+ Timestamp: e.Timestamp,
+ Meta: e.Meta.Clone(),
+ Fields: e.Fields.Clone(),
+ Private: e.Private,
+ TimeSeries: e.TimeSeries,
+ }
+}
+
// DeepUpdate recursively copies the key-value pairs from `d` to various properties of the event.
// When the key equals `@timestamp` it's set as the `Timestamp` property of the event.
// When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields`
diff --git a/libbeat/processors/actions/add_network_direction_test.go b/libbeat/processors/actions/add_network_direction_test.go
index 54fb170426b..da507a18374 100644
--- a/libbeat/processors/actions/add_network_direction_test.go
+++ b/libbeat/processors/actions/add_network_direction_test.go
@@ -77,4 +77,30 @@ func TestNetworkDirection(t *testing.T) {
}
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ evt := beat.Event{
+ Meta: common.MapStr{},
+ Fields: common.MapStr{
+ "source": "1.1.1.1",
+ "destination": "8.8.8.8",
+ },
+ }
+ p, err := NewAddNetworkDirection(common.MustNewConfigFrom(map[string]interface{}{
+ "source": "source",
+ "destination": "destination",
+ "target": "@metadata.direction",
+ "internal_networks": "private",
+ }))
+ require.NoError(t, err)
+
+ expectedMeta := common.MapStr{
+ "direction": "external",
+ }
+
+ observed, err := p.Run(&evt)
+ require.NoError(t, err)
+ require.Equal(t, expectedMeta, observed.Meta)
+ require.Equal(t, evt.Fields, observed.Fields)
+ })
}
diff --git a/libbeat/processors/actions/copy_fields.go b/libbeat/processors/actions/copy_fields.go
index 43a797e0fd5..0b77da01450 100644
--- a/libbeat/processors/actions/copy_fields.go
+++ b/libbeat/processors/actions/copy_fields.go
@@ -69,18 +69,18 @@ func NewCopyFields(c *common.Config) (processors.Processor, error) {
}
func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
- var backup common.MapStr
+ var backup *beat.Event
if f.config.FailOnError {
- backup = event.Fields.Clone()
+ backup = event.Clone()
}
for _, field := range f.config.Fields {
- err := f.copyField(field.From, field.To, event.Fields)
+ err := f.copyField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err)
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
- event.Fields = backup
+ event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
@@ -90,13 +90,13 @@ func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}
-func (f *copyFields) copyField(from string, to string, fields common.MapStr) error {
- exists, _ := fields.HasKey(to)
- if exists {
+func (f *copyFields) copyField(from string, to string, event *beat.Event) error {
+ _, err := event.GetValue(to)
+ if err == nil {
return fmt.Errorf("target field %s already exists, drop or rename this field first", to)
}
- value, err := fields.GetValue(from)
+ value, err := event.GetValue(from)
if err != nil {
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return nil
@@ -104,7 +104,7 @@ func (f *copyFields) copyField(from string, to string, fields common.MapStr) err
return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err)
}
- _, err = fields.Put(to, cloneValue(value))
+ _, err = event.PutValue(to, cloneValue(value))
if err != nil {
return fmt.Errorf("could not copy value to %s: %v, %+v", to, value, err)
}
diff --git a/libbeat/processors/actions/copy_fields_test.go b/libbeat/processors/actions/copy_fields_test.go
index 96b382a596a..5e890b5cbe6 100644
--- a/libbeat/processors/actions/copy_fields_test.go
+++ b/libbeat/processors/actions/copy_fields_test.go
@@ -168,4 +168,35 @@ func TestCopyFields(t *testing.T) {
assert.Equal(t, test.Expected, newEvent.Fields)
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ p := copyFields{
+ copyFieldsConfig{
+ Fields: []fromTo{
+ {
+ From: "@metadata.message",
+ To: "@metadata.message_copied",
+ },
+ },
+ },
+ log,
+ }
+
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "message": "please copy this line",
+ },
+ }
+
+ expMeta := common.MapStr{
+ "message": "please copy this line",
+ "message_copied": "please copy this line",
+ }
+
+ newEvent, err := p.Run(event)
+ assert.NoError(t, err)
+
+ assert.Equal(t, expMeta, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
}
diff --git a/libbeat/processors/actions/decode_base64_field.go b/libbeat/processors/actions/decode_base64_field.go
index 55e61c3fd61..63973be2129 100644
--- a/libbeat/processors/actions/decode_base64_field.go
+++ b/libbeat/processors/actions/decode_base64_field.go
@@ -74,10 +74,10 @@ func NewDecodeBase64Field(c *common.Config) (processors.Processor, error) {
}
func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) {
- var backup common.MapStr
+ var backup *beat.Event
// Creates a copy of the event to revert in case of failure
if f.config.FailOnError {
- backup = event.Fields.Clone()
+ backup = event.Clone()
}
err := f.decodeField(event)
@@ -85,7 +85,7 @@ func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) {
errMsg := fmt.Errorf("failed to decode base64 fields in processor: %v", err)
f.log.Debug(errMsg.Error())
if f.config.FailOnError {
- event.Fields = backup
+ event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
diff --git a/libbeat/processors/actions/decode_base64_field_test.go b/libbeat/processors/actions/decode_base64_field_test.go
index b0bc3689709..3355400e9f0 100644
--- a/libbeat/processors/actions/decode_base64_field_test.go
+++ b/libbeat/processors/actions/decode_base64_field_test.go
@@ -221,4 +221,37 @@ func TestDecodeBase64Run(t *testing.T) {
assert.Equal(t, test.Output, newEvent.Fields)
})
}
+
+ t.Run("supports a metadata field", func(t *testing.T) {
+ config := base64Config{
+ Field: fromTo{
+ From: "field1",
+ To: "@metadata.field",
+ },
+ }
+
+ event := &beat.Event{
+ Meta: common.MapStr{},
+ Fields: common.MapStr{
+ "field1": "Y29ycmVjdCBkYXRh",
+ },
+ }
+
+ f := &decodeBase64Field{
+ log: logp.NewLogger(processorName),
+ config: config,
+ }
+
+ expectedFields := common.MapStr{
+ "field1": "Y29ycmVjdCBkYXRh",
+ }
+ expectedMeta := common.MapStr{
+ "field": "correct data",
+ }
+
+ newEvent, err := f.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expectedFields, newEvent.Fields)
+ assert.Equal(t, expectedMeta, newEvent.Meta)
+ })
}
diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go
index f58efd29b46..81d90dcf32d 100644
--- a/libbeat/processors/actions/decode_json_fields_test.go
+++ b/libbeat/processors/actions/decode_json_fields_test.go
@@ -257,6 +257,46 @@ func TestTargetRootOption(t *testing.T) {
assert.Equal(t, expected.String(), actual.String())
}
+func TestTargetMetadata(t *testing.T) {
+ event := &beat.Event{
+ Fields: common.MapStr{
+ "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
+ "pipeline": "us1",
+ },
+ Meta: common.MapStr{},
+ }
+
+ testConfig, _ = common.NewConfigFrom(map[string]interface{}{
+ "fields": fields,
+ "process_array": false,
+ "max_depth": 2,
+ "target": "@metadata.json",
+ })
+
+ log := logp.NewLogger("decode_json_fields_test")
+
+ p, err := NewDecodeJSONFields(testConfig)
+ if err != nil {
+ log.Error("Error initializing decode_json_fields")
+ t.Fatal(err)
+ }
+
+ actual, _ := p.Run(event)
+
+ expectedMeta := common.MapStr{
+ "json": map[string]interface{}{
+ "log": map[string]interface{}{
+ "level": "info",
+ },
+ "stream": "stderr",
+ "count": int64(3),
+ },
+ }
+
+ assert.Equal(t, expectedMeta, actual.Meta)
+ assert.Equal(t, event.Fields, actual.Fields)
+}
+
func TestNotJsonObjectOrArray(t *testing.T) {
var cases = []struct {
MaxDepth int
diff --git a/libbeat/processors/actions/decompress_gzip_field.go b/libbeat/processors/actions/decompress_gzip_field.go
index 21c5b7f88b5..f8e77983d25 100644
--- a/libbeat/processors/actions/decompress_gzip_field.go
+++ b/libbeat/processors/actions/decompress_gzip_field.go
@@ -67,9 +67,9 @@ func NewDecompressGzipFields(c *common.Config) (processors.Processor, error) {
// Run applies the decompress_gzip_fields processor to an event.
func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) {
- var backup common.MapStr
+ var backup *beat.Event
if f.config.FailOnError {
- backup = event.Fields.Clone()
+ backup = event.Clone()
}
err := f.decompressGzipField(event)
@@ -77,7 +77,7 @@ func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) {
errMsg := fmt.Errorf("Failed to decompress field in decompress_gzip_field processor: %v", err)
f.log.Debug(errMsg.Error())
if f.config.FailOnError {
- event.Fields = backup
+ event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
diff --git a/libbeat/processors/actions/decompress_gzip_field_test.go b/libbeat/processors/actions/decompress_gzip_field_test.go
index 73d000563b6..8beaf8b71a0 100644
--- a/libbeat/processors/actions/decompress_gzip_field_test.go
+++ b/libbeat/processors/actions/decompress_gzip_field_test.go
@@ -188,4 +188,38 @@ func TestDecompressGzip(t *testing.T) {
assert.Equal(t, test.output, newEvent.Fields)
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ t.Parallel()
+
+ event := &beat.Event{
+ Fields: common.MapStr{
+ "field1": []byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0},
+ },
+ Meta: common.MapStr{},
+ }
+
+ expectedMeta := common.MapStr{
+ "field": "decompressed data",
+ }
+
+ config := decompressGzipFieldConfig{
+ Field: fromTo{
+ From: "field1", To: "@metadata.field",
+ },
+ IgnoreMissing: false,
+ FailOnError: true,
+ }
+
+ f := &decompressGzipField{
+ log: logp.NewLogger("decompress_gzip_field"),
+ config: config,
+ }
+
+ newEvent, err := f.Run(event)
+ assert.NoError(t, err)
+
+ assert.Equal(t, expectedMeta, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
}
diff --git a/libbeat/processors/actions/detect_mime_type.go b/libbeat/processors/actions/detect_mime_type.go
index f53794bc1ff..59d7de12ba0 100644
--- a/libbeat/processors/actions/detect_mime_type.go
+++ b/libbeat/processors/actions/detect_mime_type.go
@@ -54,7 +54,7 @@ func NewDetectMimeType(cfg *common.Config) (processors.Processor, error) {
func (m *mimeTypeProcessor) Run(event *beat.Event) (*beat.Event, error) {
valI, err := event.GetValue(m.Field)
if err != nil {
- // doesn't have the required fieldd value to analyze
+ // doesn't have the required field value to analyze
return event, nil
}
val, _ := valI.(string)
@@ -63,11 +63,9 @@ func (m *mimeTypeProcessor) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}
if mimeType := mime.Detect(val); mimeType != "" {
- event.Fields.DeepUpdate(common.MapStr{
- m.Target: mimeType,
- })
+ _, err = event.PutValue(m.Target, mimeType)
}
- return event, nil
+ return event, err
}
func (m *mimeTypeProcessor) String() string {
diff --git a/libbeat/processors/actions/detect_mime_type_test.go b/libbeat/processors/actions/detect_mime_type_test.go
index 51de6c9062f..1c8bcecbd0f 100644
--- a/libbeat/processors/actions/detect_mime_type_test.go
+++ b/libbeat/processors/actions/detect_mime_type_test.go
@@ -44,6 +44,28 @@ func TestMimeTypeFromTo(t *testing.T) {
require.Equal(t, "text/plain; charset=utf-8", enriched)
}
+func TestMimeTypeFromToMetadata(t *testing.T) {
+ evt := beat.Event{
+ Meta: common.MapStr{},
+ Fields: common.MapStr{
+ "foo.bar.baz": "hello world!",
+ },
+ }
+ expectedMeta := common.MapStr{
+ "field": "text/plain; charset=utf-8",
+ }
+ p, err := NewDetectMimeType(common.MustNewConfigFrom(map[string]interface{}{
+ "field": "foo.bar.baz",
+ "target": "@metadata.field",
+ }))
+ require.NoError(t, err)
+
+ observed, err := p.Run(&evt)
+ require.NoError(t, err)
+ require.Equal(t, expectedMeta, observed.Meta)
+ require.Equal(t, evt.Fields, observed.Fields)
+}
+
func TestMimeTypeTestNoMatch(t *testing.T) {
evt := beat.Event{
Fields: common.MapStr{
diff --git a/libbeat/processors/actions/docs/add_fields.asciidoc b/libbeat/processors/actions/docs/add_fields.asciidoc
index 574c4a684c2..7023948afaf 100644
--- a/libbeat/processors/actions/docs/add_fields.asciidoc
+++ b/libbeat/processors/actions/docs/add_fields.asciidoc
@@ -6,14 +6,14 @@
++++
The `add_fields` processor adds additional fields to the event. Fields can be
-scalar values, arrays, dictionaries, or any nested combination of these.
-The `add_fields` processor will overwrite the target field if it already exists.
+scalar values, arrays, dictionaries, or any nested combination of these.
+The `add_fields` processor will overwrite the target field if it already exists.
By default the fields that you specify will be grouped under the `fields`
sub-dictionary in the event. To group the fields under a different
sub-dictionary, use the `target` setting. To store the fields as
top-level fields, set `target: ''`.
-`target`:: (Optional) Sub-dictionary to put all fields into. Defaults to `fields`.
+`target`:: (Optional) Sub-dictionary to put all fields into. Defaults to `fields`. Setting this to `@metadata` will add values to the event metadata instead of fields.
`fields`:: Fields to be added.
@@ -40,3 +40,16 @@ Adds these fields to any event:
}
}
-------------------------------------------------------------------------------
+
+This configuration will alter the event metadata:
+
+[source,yaml]
+------------------------------------------------------------------------------
+processors:
+ - add_fields:
+ target: '@metadata'
+ fields:
+ op_type: "index"
+------------------------------------------------------------------------------
+
+When the event is ingested (e.g. by Elastisearch) the document will have `op_type: "index"` set as a metadata field.
diff --git a/libbeat/processors/actions/docs/add_tags.asciidoc b/libbeat/processors/actions/docs/add_tags.asciidoc
index e338d246668..2965489839b 100644
--- a/libbeat/processors/actions/docs/add_tags.asciidoc
+++ b/libbeat/processors/actions/docs/add_tags.asciidoc
@@ -9,7 +9,7 @@ The `add_tags` processor adds tags to a list of tags. If the target field alread
the tags are appended to the existing list of tags.
`tags`:: List of tags to add.
-`target`:: (Optional) Field the tags will be added to. Defaults to `tags`.
+`target`:: (Optional) Field the tags will be added to. Defaults to `tags`. Setting tags in `@metadata` is not supported.
For example, this configuration:
diff --git a/libbeat/processors/actions/docs/copy_fields.asciidoc b/libbeat/processors/actions/docs/copy_fields.asciidoc
index 07a56dbf149..98e7862f56d 100644
--- a/libbeat/processors/actions/docs/copy_fields.asciidoc
+++ b/libbeat/processors/actions/docs/copy_fields.asciidoc
@@ -7,7 +7,7 @@
The `copy_fields` processor copies a field to another one.
-`fields`:: List of `from` and `to` pairs to copy from and to.
+`fields`:: List of `from` and `to` pairs to copy from and to. It's supported to use `@metadata.` prefix for `from` and `to` and copy values not just in/from/to the event fields but also in/from/to the event metadata.
`fail_on_error`:: (Optional) If set to `true` and an error occurs, the changes are reverted and the original is returned. If set to `false`,
processing continues if an error occurs. Default is `true`.
`ignore_missing`:: (Optional) Indicates whether to ignore events that lack the source
diff --git a/libbeat/processors/actions/docs/detect_mime_type.asciidoc b/libbeat/processors/actions/docs/detect_mime_type.asciidoc
index c93c6f882e9..d4532f3fe1f 100644
--- a/libbeat/processors/actions/docs/detect_mime_type.asciidoc
+++ b/libbeat/processors/actions/docs/detect_mime_type.asciidoc
@@ -7,7 +7,7 @@
The `detect_mime_type` processor attempts to detect a mime type for a field that
contains a given stream of bytes. The `field` key contains the field used as
-the data source and the `target` key contains the field to populate with the detected type
+the data source and the `target` key contains the field to populate with the detected type. It's supported to use `@metadata.` prefix for `target` and set the value in the event metadata instead of fields.
[source,yaml]
-------
diff --git a/libbeat/processors/actions/docs/include_fields.asciidoc b/libbeat/processors/actions/docs/include_fields.asciidoc
index 86b2c70e6ff..dee585dac90 100644
--- a/libbeat/processors/actions/docs/include_fields.asciidoc
+++ b/libbeat/processors/actions/docs/include_fields.asciidoc
@@ -7,7 +7,7 @@
The `include_fields` processor specifies which fields to export if a certain
condition is fulfilled. The condition is optional. If it's missing, the
-specified fields are always exported. The `@timestamp` and `type` fields are
+specified fields are always exported. The `@timestamp`, `@metadata` and `type` fields are
always exported, even if they are not defined in the `include_fields` list.
[source,yaml]
diff --git a/libbeat/processors/actions/docs/rename.asciidoc b/libbeat/processors/actions/docs/rename.asciidoc
index c78a814a975..55d4738dbaa 100644
--- a/libbeat/processors/actions/docs/rename.asciidoc
+++ b/libbeat/processors/actions/docs/rename.asciidoc
@@ -8,7 +8,7 @@
The `rename` processor specifies a list of fields to rename. Under the `fields`
key, each entry contains a `from: old-key` and a `to: new-key` pair, where:
-* `from` is the original field name
+* `from` is the original field name. It's supported to use `@metadata.` prefix for `from` and rename keys in the event metadata instead of event fields.
* `to` is the target field name
The `rename` processor cannot be used to overwrite fields. To overwrite fields
diff --git a/libbeat/processors/actions/docs/replace.asciidoc b/libbeat/processors/actions/docs/replace.asciidoc
index 2ad206b8e12..9193f4c103c 100644
--- a/libbeat/processors/actions/docs/replace.asciidoc
+++ b/libbeat/processors/actions/docs/replace.asciidoc
@@ -5,16 +5,16 @@
replace
++++
-The `replace` processor takes a list of fields to replace the field value
-matching a pattern with replacement string. Under the `fields` key, each entry
-contains a `field: field-name`, `pattern: regex-pattern` and
+The `replace` processor takes a list of fields to replace the field value
+matching a pattern with replacement string. Under the `fields` key, each entry
+contains a `field: field-name`, `pattern: regex-pattern` and
`replacement: replacement-string`, where:
-* `field` is the original field name
+* `field` is the original field name. It's supported to use `@metadata.` prefix for the fields and replace values in the event metadata instead of event fields.
* `pattern` is the regex pattern to match the field's value
* `replacement` is the replacement string to use to update the field's value
-The `replace` processor cannot be used to replace value with a completely new value.
+The `replace` processor cannot be used to replace value with a completely new value.
TIP: The `replacement` field value can be used to truncate the `field` value or replace
it with a new string. It can also be used for masking PII information.
@@ -35,7 +35,7 @@ processors:
The `replace` processor has following configuration settings:
-`ignore_missing`:: (Optional) If set to `true`, no error is logged if the specified field
+`ignore_missing`:: (Optional) If set to `true`, no error is logged if the specified field
is missing. The default is `false`.
`fail_on_error`:: (Optional) If set to `true` and there's an error, the replacement of
diff --git a/libbeat/processors/actions/docs/truncate_fields.asciidoc b/libbeat/processors/actions/docs/truncate_fields.asciidoc
index 667c5fb9884..d2e6c484710 100644
--- a/libbeat/processors/actions/docs/truncate_fields.asciidoc
+++ b/libbeat/processors/actions/docs/truncate_fields.asciidoc
@@ -8,7 +8,7 @@
The `truncate_fields` processor truncates a field to a given size. If the size of the field is smaller than
the limit, the field is left as is.
-`fields`:: List of fields to truncate.
+`fields`:: List of fields to truncate. It's supported to use `@metadata.` prefix for the fields and truncate values in the event metadata instead of event fields.
`max_bytes`:: Maximum number of bytes in a field. Mutually exclusive with `max_characters`.
`max_characters`:: Maximum number of characters in a field. Mutually exclusive with `max_bytes`.
`fail_on_error`:: (Optional) If set to true, in case of an error the changes to
diff --git a/libbeat/processors/actions/drop_fields_test.go b/libbeat/processors/actions/drop_fields_test.go
new file mode 100644
index 00000000000..8125564a6e1
--- /dev/null
+++ b/libbeat/processors/actions/drop_fields_test.go
@@ -0,0 +1,60 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package actions
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/common"
+)
+
+func TestDropFieldRun(t *testing.T) {
+ event := &beat.Event{
+ Fields: common.MapStr{
+ "field": "value",
+ },
+ Meta: common.MapStr{
+ "meta_field": "value",
+ },
+ }
+
+ t.Run("supports a normal field", func(t *testing.T) {
+ p := dropFields{
+ Fields: []string{"field"},
+ }
+
+ newEvent, err := p.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, common.MapStr{}, newEvent.Fields)
+ assert.Equal(t, event.Meta, newEvent.Meta)
+ })
+
+ t.Run("supports a metadata field", func(t *testing.T) {
+ p := dropFields{
+ Fields: []string{"@metadata.meta_field"},
+ }
+
+ newEvent, err := p.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, common.MapStr{}, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
+}
diff --git a/libbeat/processors/actions/extract_field_test.go b/libbeat/processors/actions/extract_field_test.go
index be466a39da2..4e95b66fd28 100644
--- a/libbeat/processors/actions/extract_field_test.go
+++ b/libbeat/processors/actions/extract_field_test.go
@@ -106,6 +106,39 @@ func TestCommonPaths(t *testing.T) {
// Event must be present, even on error
assert.NotNil(t, event)
}
+
+ t.Run("supports a metadata field", func(t *testing.T) {
+ var config, _ = common.NewConfigFrom(map[string]interface{}{
+ "field": "field",
+ "separator": "/",
+ "index": 3,
+ "target": "@metadata.field",
+ })
+
+ event := &beat.Event{
+ Meta: common.MapStr{},
+ Fields: common.MapStr{
+ "field": "/var/lib/foo/bar",
+ },
+ }
+
+ expectedFields := common.MapStr{
+ "field": "/var/lib/foo/bar",
+ }
+ expectedMeta := common.MapStr{
+ "field": "bar",
+ }
+
+ p, err := NewExtractField(config)
+ if err != nil {
+ t.Fatalf("error initializing extract_field: %s", err)
+ }
+
+ newEvent, err := p.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expectedFields, newEvent.Fields)
+ assert.Equal(t, expectedMeta, newEvent.Meta)
+ })
}
func runExtractField(t *testing.T, config *common.Config, input common.MapStr) (*beat.Event, error) {
diff --git a/libbeat/processors/actions/rename.go b/libbeat/processors/actions/rename.go
index 8be084a281b..170888d0b53 100644
--- a/libbeat/processors/actions/rename.go
+++ b/libbeat/processors/actions/rename.go
@@ -73,19 +73,19 @@ func NewRenameFields(c *common.Config) (processors.Processor, error) {
}
func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) {
- var backup common.MapStr
+ var backup *beat.Event
// Creates a copy of the event to revert in case of failure
if f.config.FailOnError {
- backup = event.Fields.Clone()
+ backup = event.Clone()
}
for _, field := range f.config.Fields {
- err := f.renameField(field.From, field.To, event.Fields)
+ err := f.renameField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err)
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
- event.Fields = backup
+ event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
@@ -95,14 +95,14 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}
-func (f *renameFields) renameField(from string, to string, fields common.MapStr) error {
+func (f *renameFields) renameField(from string, to string, event *beat.Event) error {
// Fields cannot be overwritten. Either the target field has to be dropped first or renamed first
- exists, _ := fields.HasKey(to)
- if exists {
+ _, err := event.GetValue(to)
+ if err == nil {
return fmt.Errorf("target field %s already exists, drop or rename this field first", to)
}
- value, err := fields.GetValue(from)
+ value, err := event.GetValue(from)
if err != nil {
// Ignore ErrKeyNotFound errors
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
@@ -112,12 +112,12 @@ func (f *renameFields) renameField(from string, to string, fields common.MapStr)
}
// Deletion must happen first to support cases where a becomes a.b
- err = fields.Delete(from)
+ err = event.Delete(from)
if err != nil {
return fmt.Errorf("could not delete key: %s, %+v", from, err)
}
- _, err = fields.Put(to, value)
+ _, err = event.PutValue(to, value)
if err != nil {
return fmt.Errorf("could not put value: %s: %v, %v", to, value, err)
}
diff --git a/libbeat/processors/actions/rename_test.go b/libbeat/processors/actions/rename_test.go
index 53c5a41d5bf..af9dfdd10c6 100644
--- a/libbeat/processors/actions/rename_test.go
+++ b/libbeat/processors/actions/rename_test.go
@@ -358,7 +358,7 @@ func TestRenameField(t *testing.T) {
},
}
- err := f.renameField(test.From, test.To, test.Input)
+ err := f.renameField(test.From, test.To, &beat.Event{Fields: test.Input})
if err != nil {
assert.Equal(t, test.error, true)
}
@@ -366,4 +366,32 @@ func TestRenameField(t *testing.T) {
assert.True(t, reflect.DeepEqual(test.Input, test.Output))
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "a": "c",
+ },
+ }
+
+ expMeta := common.MapStr{
+ "b": "c",
+ }
+
+ f := &renameFields{
+ config: renameFieldsConfig{
+ Fields: []fromTo{
+ {
+ From: "@metadata.a",
+ To: "@metadata.b",
+ },
+ },
+ },
+ }
+
+ newEvent, err := f.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
}
diff --git a/libbeat/processors/actions/replace.go b/libbeat/processors/actions/replace.go
index 37245817050..9dfd84ba9a8 100644
--- a/libbeat/processors/actions/replace.go
+++ b/libbeat/processors/actions/replace.go
@@ -73,19 +73,19 @@ func NewReplaceString(c *common.Config) (processors.Processor, error) {
}
func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) {
- var backup common.MapStr
+ var backup *beat.Event
// Creates a copy of the event to revert in case of failure
if f.config.FailOnError {
- backup = event.Fields.Clone()
+ backup = event.Clone()
}
for _, field := range f.config.Fields {
- err := f.replaceField(field.Field, field.Pattern, field.Replacement, event.Fields)
+ err := f.replaceField(field.Field, field.Pattern, field.Replacement, event)
if err != nil {
errMsg := fmt.Errorf("Failed to replace fields in processor: %s", err)
logp.Debug("replace", errMsg.Error())
if f.config.FailOnError {
- event.Fields = backup
+ event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
@@ -95,8 +95,8 @@ func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}
-func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, replacement string, fields common.MapStr) error {
- currentValue, err := fields.GetValue(field)
+func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, replacement string, event *beat.Event) error {
+ currentValue, err := event.GetValue(field)
if err != nil {
// Ignore ErrKeyNotFound errors
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
@@ -106,7 +106,7 @@ func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, repla
}
updatedString := pattern.ReplaceAllString(currentValue.(string), replacement)
- _, err = fields.Put(field, updatedString)
+ _, err = event.PutValue(field, updatedString)
if err != nil {
return fmt.Errorf("could not put value: %s: %v, %v", replacement, currentValue, err)
}
diff --git a/libbeat/processors/actions/replace_test.go b/libbeat/processors/actions/replace_test.go
index aa95b4fd9ea..bf54d528583 100644
--- a/libbeat/processors/actions/replace_test.go
+++ b/libbeat/processors/actions/replace_test.go
@@ -237,7 +237,7 @@ func TestReplaceField(t *testing.T) {
},
}
- err := f.replaceField(test.Field, test.Pattern, test.Replacement, test.Input)
+ err := f.replaceField(test.Field, test.Pattern, test.Replacement, &beat.Event{Fields: test.Input})
if err != nil {
assert.Equal(t, test.error, true)
}
@@ -245,4 +245,33 @@ func TestReplaceField(t *testing.T) {
assert.True(t, reflect.DeepEqual(test.Input, test.Output))
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "f": "abc",
+ },
+ }
+
+ expectedMeta := common.MapStr{
+ "f": "bbc",
+ }
+
+ f := &replaceString{
+ config: replaceStringConfig{
+ Fields: []replaceConfig{
+ {
+ Field: "@metadata.f",
+ Pattern: regexp.MustCompile(`a`),
+ Replacement: "b",
+ },
+ },
+ },
+ }
+
+ newEvent, err := f.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expectedMeta, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
}
diff --git a/libbeat/processors/actions/truncate_fields.go b/libbeat/processors/actions/truncate_fields.go
index ead07724fab..227bbee10fc 100644
--- a/libbeat/processors/actions/truncate_fields.go
+++ b/libbeat/processors/actions/truncate_fields.go
@@ -82,9 +82,9 @@ func NewTruncateFields(c *common.Config) (processors.Processor, error) {
}
func (f *truncateFields) Run(event *beat.Event) (*beat.Event, error) {
- var backup common.MapStr
+ var backup *beat.Event
if f.config.FailOnError {
- backup = event.Fields.Clone()
+ backup = event.Clone()
}
for _, field := range f.config.Fields {
@@ -92,7 +92,7 @@ func (f *truncateFields) Run(event *beat.Event) (*beat.Event, error) {
if err != nil {
f.logger.Debugf("Failed to truncate fields: %s", err)
if f.config.FailOnError {
- event.Fields = backup
+ event = backup
return event, err
}
}
diff --git a/libbeat/processors/actions/truncate_fields_test.go b/libbeat/processors/actions/truncate_fields_test.go
index c19bc4e27ff..aac7ee662cc 100644
--- a/libbeat/processors/actions/truncate_fields_test.go
+++ b/libbeat/processors/actions/truncate_fields_test.go
@@ -178,4 +178,39 @@ func TestTruncateFields(t *testing.T) {
assert.Equal(t, test.Output, newEvent.Fields)
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ p := truncateFields{
+ config: truncateFieldsConfig{
+ Fields: []string{"@metadata.message"},
+ MaxBytes: 3,
+ FailOnError: true,
+ },
+ truncate: (*truncateFields).truncateBytes,
+ logger: log,
+ }
+
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "message": "too long line",
+ },
+ Fields: common.MapStr{},
+ }
+
+ expFields := common.MapStr{
+ "log": common.MapStr{
+ "flags": []string{"truncated"},
+ },
+ }
+
+ expMeta := common.MapStr{
+ "message": "too",
+ }
+
+ newEvent, err := p.Run(event)
+ assert.NoError(t, err)
+
+ assert.Equal(t, expFields, newEvent.Fields)
+ assert.Equal(t, expMeta, newEvent.Meta)
+ })
}
diff --git a/libbeat/processors/add_id/add_id_test.go b/libbeat/processors/add_id/add_id_test.go
index 54ad579e9fd..a50349f22f0 100644
--- a/libbeat/processors/add_id/add_id_test.go
+++ b/libbeat/processors/add_id/add_id_test.go
@@ -63,3 +63,25 @@ func TestNonDefaultTargetField(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, v)
}
+
+func TestNonDefaultMetadataTarget(t *testing.T) {
+ cfg := common.MustNewConfigFrom(common.MapStr{
+ "target_field": "@metadata.foo",
+ })
+ p, err := New(cfg)
+ assert.NoError(t, err)
+
+ testEvent := &beat.Event{
+ Meta: common.MapStr{},
+ }
+
+ newEvent, err := p.Run(testEvent)
+ assert.NoError(t, err)
+
+ v, err := newEvent.Meta.GetValue("foo")
+ assert.NoError(t, err)
+ assert.NotEmpty(t, v)
+
+ v, err = newEvent.GetValue("@metadata._id")
+ assert.Error(t, err)
+}
diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go
index 2730770bc86..cdb182e4d31 100644
--- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go
+++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go
@@ -254,6 +254,9 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi
})
}
+// Run runs the processor that adds a field `kubernetes` to the event fields that
+// contains a map with various Kubernetes metadata.
+// This processor does not access or modify the `Meta` of the event.
func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
if !k.kubernetesAvailable {
return event, nil
diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go
index 31b9fa9a9db..e6f745e6e91 100644
--- a/libbeat/processors/add_process_metadata/add_process_metadata.go
+++ b/libbeat/processors/add_process_metadata/add_process_metadata.go
@@ -159,10 +159,10 @@ func containsValue(m common.MapStr, v string) bool {
return false
}
-// Run enriches the given event with the host meta data
+// Run enriches the given event with the host meta data.
func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) {
for _, pidField := range p.config.MatchPIDs {
- result, err := p.enrich(event.Fields, pidField)
+ result, err := p.enrich(event, pidField)
if err != nil {
switch err {
case common.ErrKeyNotFound:
@@ -174,7 +174,7 @@ func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) {
}
}
if result != nil {
- event.Fields = result
+ event = result
}
return event, nil
}
@@ -209,7 +209,7 @@ func pidToInt(value interface{}) (pid int, err error) {
return pid, nil
}
-func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (result common.MapStr, err error) {
+func (p *addProcessMetadata) enrich(event *beat.Event, pidField string) (result *beat.Event, err error) {
pidIf, err := event.GetValue(pidField)
if err != nil {
return nil, err
@@ -253,7 +253,7 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul
return nil, errors.New("source is not a string")
}
if !p.config.OverwriteKeys {
- if found, _ := result.HasKey(dest); found {
+ if _, err := result.GetValue(dest); err == nil {
return nil, errors.Errorf("target field '%s' already exists and overwrite_keys is false", dest)
}
}
@@ -264,7 +264,7 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul
continue
}
- if _, err = result.Put(dest, value); err != nil {
+ if _, err = result.PutValue(dest, value); err != nil {
return nil, err
}
}
diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go
index 51f4a33172e..461a71efa3d 100644
--- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go
+++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go
@@ -751,6 +751,42 @@ func TestAddProcessMetadata(t *testing.T) {
}
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ c := common.MapStr{
+ "match_pids": []string{"@metadata.system.ppid"},
+ "target": "@metadata",
+ "include_fields": []string{"process.name"},
+ }
+
+ config, err := common.NewConfigFrom(c)
+ assert.NoError(t, err)
+
+ proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true)
+ assert.NoError(t, err)
+
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "system": common.MapStr{
+ "ppid": "1",
+ },
+ },
+ Fields: common.MapStr{},
+ }
+ expMeta := common.MapStr{
+ "system": common.MapStr{
+ "ppid": "1",
+ },
+ "process": common.MapStr{
+ "name": "systemd",
+ },
+ }
+
+ result, err := proc.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, result.Meta)
+ assert.Equal(t, event.Fields, result.Fields)
+ })
}
func TestUsingCache(t *testing.T) {
diff --git a/libbeat/processors/communityid/communityid_test.go b/libbeat/processors/communityid/communityid_test.go
index 2356b005cea..0ab643d05de 100644
--- a/libbeat/processors/communityid/communityid_test.go
+++ b/libbeat/processors/communityid/communityid_test.go
@@ -147,6 +147,26 @@ func TestRun(t *testing.T) {
e.Put("network.iana_number", tcpProtocol)
testProcessor(t, 0, e, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=")
})
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ event := &beat.Event{
+ Fields: evt(),
+ Meta: common.MapStr{},
+ }
+ c := defaultConfig()
+ c.Target = "@metadata.community_id"
+ c.Seed = 0
+ p, err := newFromConfig(c)
+ assert.NoError(t, err)
+
+ out, err := p.Run(event)
+ assert.NoError(t, err)
+
+ id, err := out.Meta.GetValue("community_id")
+ assert.NoError(t, err)
+
+ assert.EqualValues(t, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=", id)
+ })
}
func testProcessor(t testing.TB, seed uint16, fields common.MapStr, expectedHash interface{}) {
diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go
index b781f37ed4a..999a5abd418 100644
--- a/libbeat/processors/convert/convert.go
+++ b/libbeat/processors/convert/convert.go
@@ -80,19 +80,19 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
}
// Backup original event.
- saved := *event
+ saved := event
+
if len(p.Fields) > 1 && p.FailOnError {
// Clone the fields to allow the processor to undo the operation on
// failure (like a transaction). If there is only one conversion then
// cloning is unnecessary because there are no previous changes to
// rollback (so avoid the expensive clone operation).
- saved.Fields = event.Fields.Clone()
- saved.Meta = event.Meta.Clone()
+ saved = event.Clone()
}
// Update the event with the converted values.
if err := p.writeToEvent(event, converted); err != nil {
- return &saved, err
+ return saved, err
}
return event, nil
diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go
index f761b047f4c..ea57413562f 100644
--- a/libbeat/processors/convert/convert_test.go
+++ b/libbeat/processors/convert/convert_test.go
@@ -146,6 +146,30 @@ func TestConvert(t *testing.T) {
`"Tag":"convert_ip","IgnoreMissing":false,"FailOnError":true,"Mode":"copy"}`,
p.String())
})
+
+ t.Run("metadata as a target", func(t *testing.T) {
+ c := defaultConfig()
+ c.Tag = "convert_ip"
+ c.Fields = append(c.Fields, field{From: "@metadata.source", To: "@metadata.dest", Type: Integer})
+
+ evt := &beat.Event{
+ Meta: common.MapStr{
+ "source": "1",
+ },
+ }
+ expMeta := common.MapStr{
+ "source": "1",
+ "dest": int32(1),
+ }
+
+ p, err := newConvert(c)
+ assert.NoError(t, err)
+
+ newEvt, err := p.Run(evt)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvt.Meta)
+ assert.Equal(t, evt.Fields, newEvt.Fields)
+ })
}
func TestConvertRun(t *testing.T) {
diff --git a/libbeat/processors/decode_csv_fields/decode_csv_fields.go b/libbeat/processors/decode_csv_fields/decode_csv_fields.go
index ff2cdf0938c..f971e7e8f5b 100644
--- a/libbeat/processors/decode_csv_fields/decode_csv_fields.go
+++ b/libbeat/processors/decode_csv_fields/decode_csv_fields.go
@@ -100,14 +100,13 @@ func NewDecodeCSVField(c *common.Config) (processors.Processor, error) {
// Run applies the decode_csv_field processor to an event.
func (f *decodeCSVFields) Run(event *beat.Event) (*beat.Event, error) {
- saved := *event
+ var saved *beat.Event
if f.FailOnError {
- saved.Fields = event.Fields.Clone()
- saved.Meta = event.Meta.Clone()
+ saved = event.Clone()
}
for src, dest := range f.fields {
if err := f.decodeCSVField(src, dest, event); err != nil && f.FailOnError {
- return &saved, err
+ return saved, err
}
}
return event, nil
diff --git a/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go b/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go
index 8a406ca06e6..e85d91873cf 100644
--- a/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go
+++ b/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go
@@ -338,6 +338,37 @@ func TestDecodeCSVField(t *testing.T) {
assert.NoError(t, err)
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ config := common.MapStr{
+ "fields": common.MapStr{
+ "@metadata": common.MapStr{
+ "field": "@metadata.message",
+ },
+ },
+ }
+
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "field": "17,192.168.33.1,8.8.8.8",
+ },
+ Fields: common.MapStr{},
+ }
+ expMeta := common.MapStr{
+ "field": "17,192.168.33.1,8.8.8.8",
+ "message": []string{"17", "192.168.33.1", "8.8.8.8"},
+ }
+
+ processor, err := NewDecodeCSVField(common.MustNewConfigFrom(config))
+ if err != nil {
+ t.Fatal(err)
+ }
+ result, err := processor.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, result.Meta)
+ assert.Equal(t, event.Fields, result.Fields)
+ })
+
}
func TestDecodeCSVField_String(t *testing.T) {
diff --git a/libbeat/processors/decode_xml/decode_xml_test.go b/libbeat/processors/decode_xml/decode_xml_test.go
index 83ef61ea226..5a86787ae46 100644
--- a/libbeat/processors/decode_xml/decode_xml_test.go
+++ b/libbeat/processors/decode_xml/decode_xml_test.go
@@ -402,6 +402,54 @@ func TestDecodeXML(t *testing.T) {
assert.Equal(t, test.Output, newEvent.Fields)
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ t.Parallel()
+ target := "@metadata.xml"
+ config := decodeXMLConfig{
+ Field: "@metadata.message",
+ Target: &target,
+ }
+
+ f, err := newDecodeXML(config)
+ require.NoError(t, err)
+
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "message": `
+
+ William H. Gaddis
+ The Recognitions
+ One of the great seminal American novels of the 20th century.
+
+ `,
+ },
+ }
+ expMeta := common.MapStr{
+ "xml": common.MapStr{
+ "catalog": map[string]interface{}{
+ "book": map[string]interface{}{
+ "author": "William H. Gaddis",
+ "review": "One of the great seminal American novels of the 20th century.",
+ "seq": "1",
+ "title": "The Recognitions",
+ },
+ },
+ },
+ "message": `
+
+ William H. Gaddis
+ The Recognitions
+ One of the great seminal American novels of the 20th century.
+
+ `,
+ }
+
+ newEvent, err := f.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
}
func BenchmarkProcessor_Run(b *testing.B) {
diff --git a/libbeat/processors/decode_xml_wineventlog/processor_test.go b/libbeat/processors/decode_xml_wineventlog/processor_test.go
index 1aef817968f..4fc3c92d4e7 100644
--- a/libbeat/processors/decode_xml_wineventlog/processor_test.go
+++ b/libbeat/processors/decode_xml_wineventlog/processor_test.go
@@ -28,6 +28,68 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)
+var testMessage = "" +
+ "4672001254800x8020000000000000" +
+ "11303Securityvagrant" +
+ "S-1-5-18SYSTEMNT AUTHORITY0x3e7" +
+ "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\t" +
+ "SeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege" +
+ "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\n" +
+ "Privileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\t" +
+ "SeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilegeInformation" +
+ "Special LogonInfoSecurityMicrosoft Windows security auditing.Audit Success"
+
+var testMessageOutput = common.MapStr{
+ "event": common.MapStr{
+ "action": "Special Logon",
+ "code": "4672",
+ "kind": "event",
+ "outcome": "success",
+ "provider": "Microsoft-Windows-Security-Auditing",
+ },
+ "host": common.MapStr{
+ "name": "vagrant",
+ },
+ "log": common.MapStr{
+ "level": "information",
+ },
+ "winlog": common.MapStr{
+ "channel": "Security",
+ "outcome": "success",
+ "activity_id": "{ffb23523-1f32-0000-c335-b2ff321fd701}",
+ "level": "information",
+ "event_id": "4672",
+ "provider_name": "Microsoft-Windows-Security-Auditing",
+ "record_id": uint64(11303),
+ "computer_name": "vagrant",
+ "time_created": func() time.Time {
+ t, _ := time.Parse(time.RFC3339Nano, "2021-03-23T09:56:13.137310000Z")
+ return t
+ }(),
+ "opcode": "Info",
+ "provider_guid": "{54849625-5478-4994-a5ba-3e3b0328c30d}",
+ "event_data": common.MapStr{
+ "SubjectUserSid": "S-1-5-18",
+ "SubjectUserName": "SYSTEM",
+ "SubjectDomainName": "NT AUTHORITY",
+ "SubjectLogonId": "0x3e7",
+ "PrivilegeList": "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege",
+ },
+ "task": "Special Logon",
+ "keywords": []string{
+ "Audit Success",
+ },
+ "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege",
+ "process": common.MapStr{
+ "pid": uint32(652),
+ "thread": common.MapStr{
+ "id": uint32(4660),
+ },
+ },
+ },
+ "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege",
+}
+
func TestProcessor(t *testing.T) {
var testCases = []struct {
description string
@@ -41,67 +103,9 @@ func TestProcessor(t *testing.T) {
description: "Decodes properly with default config",
config: defaultConfig(),
Input: common.MapStr{
- "message": "" +
- "4672001254800x8020000000000000" +
- "11303Securityvagrant" +
- "S-1-5-18SYSTEMNT AUTHORITY0x3e7" +
- "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\t" +
- "SeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege" +
- "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\n" +
- "Privileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\t" +
- "SeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilegeInformation" +
- "Special LogonInfoSecurityMicrosoft Windows security auditing.Audit Success",
- },
- Output: common.MapStr{
- "event": common.MapStr{
- "action": "Special Logon",
- "code": "4672",
- "kind": "event",
- "outcome": "success",
- "provider": "Microsoft-Windows-Security-Auditing",
- },
- "host": common.MapStr{
- "name": "vagrant",
- },
- "log": common.MapStr{
- "level": "information",
- },
- "winlog": common.MapStr{
- "channel": "Security",
- "outcome": "success",
- "activity_id": "{ffb23523-1f32-0000-c335-b2ff321fd701}",
- "level": "information",
- "event_id": "4672",
- "provider_name": "Microsoft-Windows-Security-Auditing",
- "record_id": uint64(11303),
- "computer_name": "vagrant",
- "time_created": func() time.Time {
- t, _ := time.Parse(time.RFC3339Nano, "2021-03-23T09:56:13.137310000Z")
- return t
- }(),
- "opcode": "Info",
- "provider_guid": "{54849625-5478-4994-a5ba-3e3b0328c30d}",
- "event_data": common.MapStr{
- "SubjectUserSid": "S-1-5-18",
- "SubjectUserName": "SYSTEM",
- "SubjectDomainName": "NT AUTHORITY",
- "SubjectLogonId": "0x3e7",
- "PrivilegeList": "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege",
- },
- "task": "Special Logon",
- "keywords": []string{
- "Audit Success",
- },
- "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege",
- "process": common.MapStr{
- "pid": uint32(652),
- "thread": common.MapStr{
- "id": uint32(4660),
- },
- },
- },
- "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege",
+ "message": testMessage,
},
+ Output: testMessageOutput,
},
{
description: "Decodes without ECS",
@@ -112,62 +116,11 @@ func TestProcessor(t *testing.T) {
Target: "winlog",
},
Input: common.MapStr{
- "message": "" +
- "4672001254800x8020000000000000" +
- "11303Securityvagrant" +
- "S-1-5-18SYSTEMNT AUTHORITY0x3e7" +
- "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\t" +
- "SeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege" +
- "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\n" +
- "Privileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\t" +
- "SeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilegeInformation" +
- "Special LogonInfoSecurityMicrosoft Windows security auditing.Audit Success",
+ "message": testMessage,
},
Output: common.MapStr{
- "winlog": common.MapStr{
- "channel": "Security",
- "outcome": "success",
- "activity_id": "{ffb23523-1f32-0000-c335-b2ff321fd701}",
- "level": "information",
- "event_id": "4672",
- "provider_name": "Microsoft-Windows-Security-Auditing",
- "record_id": uint64(11303),
- "computer_name": "vagrant",
- "time_created": func() time.Time {
- t, _ := time.Parse(time.RFC3339Nano, "2021-03-23T09:56:13.137310000Z")
- return t
- }(),
- "opcode": "Info",
- "provider_guid": "{54849625-5478-4994-a5ba-3e3b0328c30d}",
- "event_data": common.MapStr{
- "SubjectUserSid": "S-1-5-18",
- "SubjectUserName": "SYSTEM",
- "SubjectDomainName": "NT AUTHORITY",
- "SubjectLogonId": "0x3e7",
- "PrivilegeList": "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege",
- },
- "task": "Special Logon",
- "keywords": []string{
- "Audit Success",
- },
- "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege",
- "process": common.MapStr{
- "pid": uint32(652),
- "thread": common.MapStr{
- "id": uint32(4660),
- },
- },
- },
- "message": "" +
- "4672001254800x8020000000000000" +
- "11303Securityvagrant" +
- "S-1-5-18SYSTEMNT AUTHORITY0x3e7" +
- "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\t" +
- "SeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege" +
- "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\n" +
- "Privileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\t" +
- "SeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilegeInformation" +
- "Special LogonInfoSecurityMicrosoft Windows security auditing.Audit Success",
+ "winlog": testMessageOutput["winlog"],
+ "message": testMessage,
},
},
}
@@ -194,4 +147,31 @@ func TestProcessor(t *testing.T) {
assert.Equal(t, test.Output, newEvent.Fields)
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ t.Parallel()
+
+ config := defaultConfig()
+ config.Field = "@metadata.message"
+ config.Target = "@metadata.target"
+
+ f, err := newProcessor(config)
+ require.NoError(t, err)
+
+ event := &beat.Event{
+ Fields: common.MapStr{},
+ Meta: common.MapStr{
+ "message": testMessage,
+ },
+ }
+
+ expMeta := common.MapStr{
+ "message": testMessage,
+ "target": testMessageOutput["winlog"],
+ }
+ newEvent, err := f.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
}
diff --git a/libbeat/processors/dissect/processor.go b/libbeat/processors/dissect/processor.go
index b3e8ac9f635..5b9edd3d7c2 100644
--- a/libbeat/processors/dissect/processor.go
+++ b/libbeat/processors/dissect/processor.go
@@ -104,21 +104,21 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
return event, err
}
+ backup := event.Clone()
+
if convertDataType {
event, err = p.mapper(event, mapInterfaceToMapStr(mc))
} else {
event, err = p.mapper(event, mapToMapStr(m))
}
if err != nil {
- return event, err
+ return backup, err
}
return event, nil
}
func (p *processor) mapper(event *beat.Event, m common.MapStr) (*beat.Event, error) {
- copy := event.Fields.Clone()
-
prefix := ""
if p.config.TargetPrefix != "" {
prefix = p.config.TargetPrefix + "."
@@ -129,7 +129,6 @@ func (p *processor) mapper(event *beat.Event, m common.MapStr) (*beat.Event, err
if _, err := event.GetValue(prefixKey); err == common.ErrKeyNotFound || p.config.OverwriteKeys {
event.PutValue(prefixKey, v)
} else {
- event.Fields = copy
// When the target key exists but is a string instead of a map.
if err != nil {
return event, errors.Wrapf(err, "cannot override existing key with `%s`", prefixKey)
diff --git a/libbeat/processors/dissect/processor_test.go b/libbeat/processors/dissect/processor_test.go
index 5a3d0217021..f46aa8bd78f 100644
--- a/libbeat/processors/dissect/processor_test.go
+++ b/libbeat/processors/dissect/processor_test.go
@@ -152,6 +152,34 @@ func TestProcessor(t *testing.T) {
}
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ e := &beat.Event{
+ Meta: common.MapStr{
+ "message": "hello world",
+ },
+ }
+ expMeta := common.MapStr{
+ "message": "hello world",
+ "key": "world",
+ }
+
+ c := map[string]interface{}{
+ "tokenizer": "hello %{key}",
+ "field": "@metadata.message",
+ "target_prefix": "@metadata",
+ }
+ cfg, err := common.NewConfigFrom(c)
+ assert.NoError(t, err)
+
+ processor, err := NewProcessor(cfg)
+ assert.NoError(t, err)
+
+ newEvent, err := processor.Run(e)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvent.Meta)
+ assert.Equal(t, e.Fields, newEvent.Fields)
+ })
}
func TestFieldDoesntExist(t *testing.T) {
diff --git a/libbeat/processors/dns/dns_test.go b/libbeat/processors/dns/dns_test.go
index 2b26ff50081..32f95c7b810 100644
--- a/libbeat/processors/dns/dns_test.go
+++ b/libbeat/processors/dns/dns_test.go
@@ -92,6 +92,36 @@ func TestDNSProcessorRun(t *testing.T) {
v, _ := event.GetValue("source.domain")
assert.Equal(t, gatewayName, v)
})
+
+ t.Run("metadata target", func(t *testing.T) {
+ config := defaultConfig
+ config.reverseFlat = map[string]string{
+ "@metadata.ip": "@metadata.domain",
+ }
+
+ p := &processor{
+ Config: config,
+ resolver: &stubResolver{},
+ log: logp.NewLogger(logName),
+ }
+
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "ip": gatewayIP,
+ },
+ }
+
+ expMeta := common.MapStr{
+ "ip": gatewayIP,
+ "domain": gatewayName,
+ }
+
+ newEvent, err := p.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
+
}
func TestDNSProcessorTagOnFailure(t *testing.T) {
diff --git a/libbeat/processors/extract_array/extract_array.go b/libbeat/processors/extract_array/extract_array.go
index 1c9fc05da93..4e833cc2851 100644
--- a/libbeat/processors/extract_array/extract_array.go
+++ b/libbeat/processors/extract_array/extract_array.go
@@ -129,10 +129,9 @@ func (f *extractArrayProcessor) Run(event *beat.Event) (*beat.Event, error) {
return event, errors.Wrapf(err, "unsupported type for field %s: got: %s needed: array", f.config.Field, t.String())
}
- saved := *event
+ saved := event
if f.config.FailOnError {
- saved.Fields = event.Fields.Clone()
- saved.Meta = event.Meta.Clone()
+ saved = event.Clone()
}
n := array.Len()
@@ -141,7 +140,7 @@ func (f *extractArrayProcessor) Run(event *beat.Event) (*beat.Event, error) {
if !f.config.FailOnError {
continue
}
- return &saved, errors.Errorf("index %d exceeds length of %d when processing mapping for field %s", mapping.from, n, mapping.to)
+ return saved, errors.Errorf("index %d exceeds length of %d when processing mapping for field %s", mapping.from, n, mapping.to)
}
cell := array.Index(mapping.from)
// checking for CanInterface() here is done to prevent .Interface() from
@@ -155,14 +154,14 @@ func (f *extractArrayProcessor) Run(event *beat.Event) (*beat.Event, error) {
if !f.config.FailOnError {
continue
}
- return &saved, errors.Errorf("target field %s already has a value. Set the overwrite_keys flag or drop/rename the field first", mapping.to)
+ return saved, errors.Errorf("target field %s already has a value. Set the overwrite_keys flag or drop/rename the field first", mapping.to)
}
}
if _, err = event.PutValue(mapping.to, clone(cell.Interface())); err != nil {
if !f.config.FailOnError {
continue
}
- return &saved, errors.Wrapf(err, "failed setting field %s", mapping.to)
+ return saved, errors.Wrapf(err, "failed setting field %s", mapping.to)
}
}
return event, nil
diff --git a/libbeat/processors/extract_array/extract_array_test.go b/libbeat/processors/extract_array/extract_array_test.go
index 1707e317530..b3d2aa63393 100644
--- a/libbeat/processors/extract_array/extract_array_test.go
+++ b/libbeat/processors/extract_array/extract_array_test.go
@@ -267,4 +267,38 @@ func TestExtractArrayProcessor_Run(t *testing.T) {
t.Log(result)
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+
+ config := common.MapStr{
+ "field": "@metadata.array",
+ "mappings": common.MapStr{
+ "@metadata.first": 1,
+ "@metadata.second": 2,
+ },
+ }
+
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "array": []interface{}{"zero", 1, common.MapStr{"two": 2}},
+ },
+ }
+
+ expMeta := common.MapStr{
+ "array": []interface{}{"zero", 1, common.MapStr{"two": 2}},
+ "first": 1,
+ "second": common.MapStr{"two": 2},
+ }
+
+ cfg := common.MustNewConfigFrom(config)
+ processor, err := New(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ result, err := processor.Run(event)
+ assert.NoError(t, err)
+
+ assert.Equal(t, expMeta, result.Meta)
+ assert.Equal(t, event.Fields, result.Fields)
+ })
}
diff --git a/libbeat/processors/fingerprint/fingerprint_test.go b/libbeat/processors/fingerprint/fingerprint_test.go
index 044c57a2eed..8ee7c848d83 100644
--- a/libbeat/processors/fingerprint/fingerprint_test.go
+++ b/libbeat/processors/fingerprint/fingerprint_test.go
@@ -81,6 +81,31 @@ func TestWithConfig(t *testing.T) {
assert.Equal(t, test.want, v)
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ config := common.MustNewConfigFrom(common.MapStr{
+ "fields": []string{"@metadata.message"},
+ "target_field": "@metadata.fingerprint",
+ })
+ p, err := New(config)
+ require.NoError(t, err)
+
+ testEvent := &beat.Event{
+ Timestamp: time.Unix(1635443183, 0),
+ Meta: common.MapStr{
+ "message": "hello world",
+ },
+ }
+
+ expMeta := common.MapStr{
+ "message": "hello world",
+ "fingerprint": "1a3fe8251076ed8de5fd99ce529d2b9971c54851d4d45f5a576bed91d0cc4202",
+ }
+ newEvent, err := p.Run(testEvent)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvent.Meta)
+ assert.Equal(t, testEvent.Fields, newEvent.Fields)
+ })
}
func TestHashMethods(t *testing.T) {
diff --git a/libbeat/processors/registered_domain/registered_domain_test.go b/libbeat/processors/registered_domain/registered_domain_test.go
index 085a3eb787a..0b99e5ebdab 100644
--- a/libbeat/processors/registered_domain/registered_domain_test.go
+++ b/libbeat/processors/registered_domain/registered_domain_test.go
@@ -93,4 +93,30 @@ func TestProcessorRun(t *testing.T) {
assert.Equal(t, tc.ETLD, etld)
}
}
+
+ t.Run("support metadata as a target", func(t *testing.T) {
+ c := defaultConfig()
+ c.Field = "@metadata.domain"
+ c.TargetField = "@metadata.registered_domain"
+ c.TargetSubdomainField = "@metadata.subdomain"
+ c.TargetETLDField = "@metadata.etld"
+ p, err := newRegisteredDomain(c)
+
+ evt := &beat.Event{
+ Meta: common.MapStr{
+ "domain": "www.google.com",
+ },
+ }
+ expMeta := common.MapStr{
+ "domain": "www.google.com",
+ "registered_domain": "google.com",
+ "subdomain": "www",
+ "etld": "com",
+ }
+
+ newEvt, err := p.Run(evt)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvt.Meta)
+ assert.Equal(t, evt.Fields, newEvt.Fields)
+ })
}
diff --git a/libbeat/processors/timestamp/timestamp_test.go b/libbeat/processors/timestamp/timestamp_test.go
index 466db497cfd..54b7d8b9e95 100644
--- a/libbeat/processors/timestamp/timestamp_test.go
+++ b/libbeat/processors/timestamp/timestamp_test.go
@@ -302,3 +302,38 @@ func TestTimezone(t *testing.T) {
})
}
}
+
+func TestMetadataTarget(t *testing.T) {
+ datetime := "2006-01-02T15:04:05Z"
+ c := defaultConfig()
+ c.Field = "@metadata.time"
+ c.TargetField = "@metadata.ts"
+ c.Layouts = append(c.Layouts, time.RFC3339)
+ c.Timezone = cfgtype.MustNewTimezone("EST")
+
+ p, err := newFromConfig(c)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ evt := &beat.Event{
+ Meta: common.MapStr{
+ "time": datetime,
+ },
+ }
+
+ newEvt, err := p.Run(evt)
+ assert.NoError(t, err)
+
+ expTs, err := time.Parse(time.RFC3339, datetime)
+ assert.NoError(t, err)
+
+ expMeta := common.MapStr{
+ "time": datetime,
+ "ts": expTs.UTC(),
+ }
+
+ assert.Equal(t, expMeta, newEvt.Meta)
+ assert.Equal(t, evt.Fields, newEvt.Fields)
+ assert.Equal(t, evt.Timestamp, newEvt.Timestamp)
+}
diff --git a/libbeat/processors/translate_sid/translatesid_test.go b/libbeat/processors/translate_sid/translatesid_test.go
index 267e726250d..3f27b6569ce 100644
--- a/libbeat/processors/translate_sid/translatesid_test.go
+++ b/libbeat/processors/translate_sid/translatesid_test.go
@@ -30,6 +30,7 @@ import (
"golang.org/x/sys/windows"
"github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/winlogbeat/sys/winevent"
)
@@ -83,6 +84,33 @@ func TestTranslateSID(t *testing.T) {
}
})
}
+
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ p, err := newFromConfig(config{
+ Field: "@metadata.sid",
+ DomainTarget: "@metadata.domain",
+ AccountNameTarget: "@metadata.account",
+ AccountTypeTarget: "@metadata.type",
+ })
+ assert.NoError(t, err)
+ evt := &beat.Event{
+ Meta: common.MapStr{
+ "sid": "S-1-5-7",
+ },
+ }
+
+ expMeta := common.MapStr{
+ "sid": "S-1-5-32-544",
+ "domain": "BUILTIN",
+ "account": "Administrators",
+ "type": winevent.SidTypeAlias,
+ }
+
+ newEvt, err := p.Run(evt)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvt.Meta)
+ assert.Equal(t, evt.Fields, newEvt.Fields)
+ })
}
func TestTranslateSIDEmptyTarget(t *testing.T) {
diff --git a/libbeat/processors/urldecode/urldecode.go b/libbeat/processors/urldecode/urldecode.go
index 6fc9cb8386d..110b6387a2d 100644
--- a/libbeat/processors/urldecode/urldecode.go
+++ b/libbeat/processors/urldecode/urldecode.go
@@ -73,9 +73,9 @@ func New(c *common.Config) (processors.Processor, error) {
}
func (p *urlDecode) Run(event *beat.Event) (*beat.Event, error) {
- var backup common.MapStr
+ var backup *beat.Event
if p.config.FailOnError {
- backup = event.Fields.Clone()
+ backup = event.Clone()
}
for _, field := range p.config.Fields {
@@ -84,7 +84,7 @@ func (p *urlDecode) Run(event *beat.Event) (*beat.Event, error) {
errMsg := fmt.Errorf("failed to decode fields in urldecode processor: %v", err)
p.log.Debug(errMsg.Error())
if p.config.FailOnError {
- event.Fields = backup
+ event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
diff --git a/libbeat/processors/urldecode/urldecode_test.go b/libbeat/processors/urldecode/urldecode_test.go
index 8d962bd9470..7e32aad3a56 100644
--- a/libbeat/processors/urldecode/urldecode_test.go
+++ b/libbeat/processors/urldecode/urldecode_test.go
@@ -211,4 +211,32 @@ func TestURLDecode(t *testing.T) {
})
}
+ t.Run("supports metadata as a target", func(t *testing.T) {
+ t.Parallel()
+
+ config := urlDecodeConfig{
+ Fields: []fromTo{{
+ From: "@metadata.field", To: "@metadata.target",
+ }},
+ }
+
+ f := &urlDecode{
+ log: logp.NewLogger("urldecode"),
+ config: config,
+ }
+
+ event := &beat.Event{
+ Meta: common.MapStr{
+ "field": "correct%20data",
+ },
+ }
+ expMeta := common.MapStr{
+ "field": "correct%20data",
+ "target": "correct data",
+ }
+ newEvent, err := f.Run(event)
+ assert.NoError(t, err)
+ assert.Equal(t, expMeta, newEvent.Meta)
+ assert.Equal(t, event.Fields, newEvent.Fields)
+ })
}