From ba6d1768469a2d92e143c5e4b5ba9dd969a5759d Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Fri, 4 Feb 2022 19:46:06 +0100 Subject: [PATCH] Add metadata change support for processors (#30183) Some of the processors that are configured with a target field didn't support special cases with the target field is a `@timestamp` or a `@metadata` sub-field. This change is to make it consistent across processors or to document why the processor does not need this change. Also added tests for each processor that is supposed to support metadata as a target. --- CHANGELOG.next.asciidoc | 1 + libbeat/beat/event.go | 11 + .../actions/add_network_direction_test.go | 26 +++ libbeat/processors/actions/copy_fields.go | 18 +- .../processors/actions/copy_fields_test.go | 31 +++ .../processors/actions/decode_base64_field.go | 6 +- .../actions/decode_base64_field_test.go | 33 +++ .../actions/decode_json_fields_test.go | 40 ++++ .../actions/decompress_gzip_field.go | 6 +- .../actions/decompress_gzip_field_test.go | 34 +++ .../processors/actions/detect_mime_type.go | 8 +- .../actions/detect_mime_type_test.go | 22 ++ .../actions/docs/add_fields.asciidoc | 19 +- .../processors/actions/docs/add_tags.asciidoc | 2 +- .../actions/docs/copy_fields.asciidoc | 2 +- .../actions/docs/detect_mime_type.asciidoc | 2 +- .../actions/docs/include_fields.asciidoc | 2 +- .../processors/actions/docs/rename.asciidoc | 2 +- .../processors/actions/docs/replace.asciidoc | 12 +- .../actions/docs/truncate_fields.asciidoc | 2 +- .../processors/actions/drop_fields_test.go | 60 +++++ .../processors/actions/extract_field_test.go | 33 +++ libbeat/processors/actions/rename.go | 20 +- libbeat/processors/actions/rename_test.go | 30 ++- libbeat/processors/actions/replace.go | 14 +- libbeat/processors/actions/replace_test.go | 31 ++- libbeat/processors/actions/truncate_fields.go | 6 +- .../actions/truncate_fields_test.go | 35 +++ libbeat/processors/add_id/add_id_test.go | 22 ++ .../add_kubernetes_metadata/kubernetes.go | 3 + .../add_process_metadata.go | 12 +- .../add_process_metadata_test.go | 36 +++ .../communityid/communityid_test.go | 20 ++ libbeat/processors/convert/convert.go | 8 +- libbeat/processors/convert/convert_test.go | 24 ++ .../decode_csv_fields/decode_csv_fields.go | 7 +- .../decode_csv_fields_test.go | 31 +++ .../processors/decode_xml/decode_xml_test.go | 48 ++++ .../decode_xml_wineventlog/processor_test.go | 208 ++++++++---------- libbeat/processors/dissect/processor.go | 7 +- libbeat/processors/dissect/processor_test.go | 28 +++ libbeat/processors/dns/dns_test.go | 30 +++ .../processors/extract_array/extract_array.go | 11 +- .../extract_array/extract_array_test.go | 34 +++ .../fingerprint/fingerprint_test.go | 25 +++ .../registered_domain_test.go | 26 +++ .../processors/timestamp/timestamp_test.go | 35 +++ .../translate_sid/translatesid_test.go | 28 +++ libbeat/processors/urldecode/urldecode.go | 6 +- .../processors/urldecode/urldecode_test.go | 28 +++ 50 files changed, 987 insertions(+), 198 deletions(-) create mode 100644 libbeat/processors/actions/drop_fields_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 933500f7cfb..a3760f72894 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - The extension of the log files of Beats and Elastic Agent is changed to `.ndjson`. If you are collecting the logs, you must change the path configuration to `/path/to/logs/{beatname}*.ndjson` to avoid any issues. {pull}28927[28927] - Remove legacy support for SSLv3. {pull}30071[30071] - `add_fields` processor is now able to set metadata in events {pull}30092[30092] +- Add metadata change support for some processors {pull}30183[30183] *Auditbeat* diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 7fd016a14dc..bc8ffe6c3e9 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -71,6 +71,17 @@ func (e *Event) GetValue(key string) (interface{}, error) { return e.Fields.GetValue(key) } +// Clone creates an exact copy of the event +func (e *Event) Clone() *Event { + return &Event{ + Timestamp: e.Timestamp, + Meta: e.Meta.Clone(), + Fields: e.Fields.Clone(), + Private: e.Private, + TimeSeries: e.TimeSeries, + } +} + // DeepUpdate recursively copies the key-value pairs from `d` to various properties of the event. // When the key equals `@timestamp` it's set as the `Timestamp` property of the event. // When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields` diff --git a/libbeat/processors/actions/add_network_direction_test.go b/libbeat/processors/actions/add_network_direction_test.go index 54fb170426b..da507a18374 100644 --- a/libbeat/processors/actions/add_network_direction_test.go +++ b/libbeat/processors/actions/add_network_direction_test.go @@ -77,4 +77,30 @@ func TestNetworkDirection(t *testing.T) { } }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + evt := beat.Event{ + Meta: common.MapStr{}, + Fields: common.MapStr{ + "source": "1.1.1.1", + "destination": "8.8.8.8", + }, + } + p, err := NewAddNetworkDirection(common.MustNewConfigFrom(map[string]interface{}{ + "source": "source", + "destination": "destination", + "target": "@metadata.direction", + "internal_networks": "private", + })) + require.NoError(t, err) + + expectedMeta := common.MapStr{ + "direction": "external", + } + + observed, err := p.Run(&evt) + require.NoError(t, err) + require.Equal(t, expectedMeta, observed.Meta) + require.Equal(t, evt.Fields, observed.Fields) + }) } diff --git a/libbeat/processors/actions/copy_fields.go b/libbeat/processors/actions/copy_fields.go index 43a797e0fd5..0b77da01450 100644 --- a/libbeat/processors/actions/copy_fields.go +++ b/libbeat/processors/actions/copy_fields.go @@ -69,18 +69,18 @@ func NewCopyFields(c *common.Config) (processors.Processor, error) { } func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) { - var backup common.MapStr + var backup *beat.Event if f.config.FailOnError { - backup = event.Fields.Clone() + backup = event.Clone() } for _, field := range f.config.Fields { - err := f.copyField(field.From, field.To, event.Fields) + err := f.copyField(field.From, field.To, event) if err != nil { errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err) f.logger.Debug(errMsg.Error()) if f.config.FailOnError { - event.Fields = backup + event = backup event.PutValue("error.message", errMsg.Error()) return event, err } @@ -90,13 +90,13 @@ func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (f *copyFields) copyField(from string, to string, fields common.MapStr) error { - exists, _ := fields.HasKey(to) - if exists { +func (f *copyFields) copyField(from string, to string, event *beat.Event) error { + _, err := event.GetValue(to) + if err == nil { return fmt.Errorf("target field %s already exists, drop or rename this field first", to) } - value, err := fields.GetValue(from) + value, err := event.GetValue(from) if err != nil { if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { return nil @@ -104,7 +104,7 @@ func (f *copyFields) copyField(from string, to string, fields common.MapStr) err return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err) } - _, err = fields.Put(to, cloneValue(value)) + _, err = event.PutValue(to, cloneValue(value)) if err != nil { return fmt.Errorf("could not copy value to %s: %v, %+v", to, value, err) } diff --git a/libbeat/processors/actions/copy_fields_test.go b/libbeat/processors/actions/copy_fields_test.go index 96b382a596a..5e890b5cbe6 100644 --- a/libbeat/processors/actions/copy_fields_test.go +++ b/libbeat/processors/actions/copy_fields_test.go @@ -168,4 +168,35 @@ func TestCopyFields(t *testing.T) { assert.Equal(t, test.Expected, newEvent.Fields) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + p := copyFields{ + copyFieldsConfig{ + Fields: []fromTo{ + { + From: "@metadata.message", + To: "@metadata.message_copied", + }, + }, + }, + log, + } + + event := &beat.Event{ + Meta: common.MapStr{ + "message": "please copy this line", + }, + } + + expMeta := common.MapStr{ + "message": "please copy this line", + "message_copied": "please copy this line", + } + + newEvent, err := p.Run(event) + assert.NoError(t, err) + + assert.Equal(t, expMeta, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) } diff --git a/libbeat/processors/actions/decode_base64_field.go b/libbeat/processors/actions/decode_base64_field.go index 55e61c3fd61..63973be2129 100644 --- a/libbeat/processors/actions/decode_base64_field.go +++ b/libbeat/processors/actions/decode_base64_field.go @@ -74,10 +74,10 @@ func NewDecodeBase64Field(c *common.Config) (processors.Processor, error) { } func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) { - var backup common.MapStr + var backup *beat.Event // Creates a copy of the event to revert in case of failure if f.config.FailOnError { - backup = event.Fields.Clone() + backup = event.Clone() } err := f.decodeField(event) @@ -85,7 +85,7 @@ func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) { errMsg := fmt.Errorf("failed to decode base64 fields in processor: %v", err) f.log.Debug(errMsg.Error()) if f.config.FailOnError { - event.Fields = backup + event = backup event.PutValue("error.message", errMsg.Error()) return event, err } diff --git a/libbeat/processors/actions/decode_base64_field_test.go b/libbeat/processors/actions/decode_base64_field_test.go index b0bc3689709..3355400e9f0 100644 --- a/libbeat/processors/actions/decode_base64_field_test.go +++ b/libbeat/processors/actions/decode_base64_field_test.go @@ -221,4 +221,37 @@ func TestDecodeBase64Run(t *testing.T) { assert.Equal(t, test.Output, newEvent.Fields) }) } + + t.Run("supports a metadata field", func(t *testing.T) { + config := base64Config{ + Field: fromTo{ + From: "field1", + To: "@metadata.field", + }, + } + + event := &beat.Event{ + Meta: common.MapStr{}, + Fields: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + }, + } + + f := &decodeBase64Field{ + log: logp.NewLogger(processorName), + config: config, + } + + expectedFields := common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + } + expectedMeta := common.MapStr{ + "field": "correct data", + } + + newEvent, err := f.Run(event) + assert.NoError(t, err) + assert.Equal(t, expectedFields, newEvent.Fields) + assert.Equal(t, expectedMeta, newEvent.Meta) + }) } diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index f58efd29b46..81d90dcf32d 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -257,6 +257,46 @@ func TestTargetRootOption(t *testing.T) { assert.Equal(t, expected.String(), actual.String()) } +func TestTargetMetadata(t *testing.T) { + event := &beat.Event{ + Fields: common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + }, + Meta: common.MapStr{}, + } + + testConfig, _ = common.NewConfigFrom(map[string]interface{}{ + "fields": fields, + "process_array": false, + "max_depth": 2, + "target": "@metadata.json", + }) + + log := logp.NewLogger("decode_json_fields_test") + + p, err := NewDecodeJSONFields(testConfig) + if err != nil { + log.Error("Error initializing decode_json_fields") + t.Fatal(err) + } + + actual, _ := p.Run(event) + + expectedMeta := common.MapStr{ + "json": map[string]interface{}{ + "log": map[string]interface{}{ + "level": "info", + }, + "stream": "stderr", + "count": int64(3), + }, + } + + assert.Equal(t, expectedMeta, actual.Meta) + assert.Equal(t, event.Fields, actual.Fields) +} + func TestNotJsonObjectOrArray(t *testing.T) { var cases = []struct { MaxDepth int diff --git a/libbeat/processors/actions/decompress_gzip_field.go b/libbeat/processors/actions/decompress_gzip_field.go index 21c5b7f88b5..f8e77983d25 100644 --- a/libbeat/processors/actions/decompress_gzip_field.go +++ b/libbeat/processors/actions/decompress_gzip_field.go @@ -67,9 +67,9 @@ func NewDecompressGzipFields(c *common.Config) (processors.Processor, error) { // Run applies the decompress_gzip_fields processor to an event. func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) { - var backup common.MapStr + var backup *beat.Event if f.config.FailOnError { - backup = event.Fields.Clone() + backup = event.Clone() } err := f.decompressGzipField(event) @@ -77,7 +77,7 @@ func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) { errMsg := fmt.Errorf("Failed to decompress field in decompress_gzip_field processor: %v", err) f.log.Debug(errMsg.Error()) if f.config.FailOnError { - event.Fields = backup + event = backup event.PutValue("error.message", errMsg.Error()) return event, err } diff --git a/libbeat/processors/actions/decompress_gzip_field_test.go b/libbeat/processors/actions/decompress_gzip_field_test.go index 73d000563b6..8beaf8b71a0 100644 --- a/libbeat/processors/actions/decompress_gzip_field_test.go +++ b/libbeat/processors/actions/decompress_gzip_field_test.go @@ -188,4 +188,38 @@ func TestDecompressGzip(t *testing.T) { assert.Equal(t, test.output, newEvent.Fields) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + t.Parallel() + + event := &beat.Event{ + Fields: common.MapStr{ + "field1": []byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}, + }, + Meta: common.MapStr{}, + } + + expectedMeta := common.MapStr{ + "field": "decompressed data", + } + + config := decompressGzipFieldConfig{ + Field: fromTo{ + From: "field1", To: "@metadata.field", + }, + IgnoreMissing: false, + FailOnError: true, + } + + f := &decompressGzipField{ + log: logp.NewLogger("decompress_gzip_field"), + config: config, + } + + newEvent, err := f.Run(event) + assert.NoError(t, err) + + assert.Equal(t, expectedMeta, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) } diff --git a/libbeat/processors/actions/detect_mime_type.go b/libbeat/processors/actions/detect_mime_type.go index f53794bc1ff..59d7de12ba0 100644 --- a/libbeat/processors/actions/detect_mime_type.go +++ b/libbeat/processors/actions/detect_mime_type.go @@ -54,7 +54,7 @@ func NewDetectMimeType(cfg *common.Config) (processors.Processor, error) { func (m *mimeTypeProcessor) Run(event *beat.Event) (*beat.Event, error) { valI, err := event.GetValue(m.Field) if err != nil { - // doesn't have the required fieldd value to analyze + // doesn't have the required field value to analyze return event, nil } val, _ := valI.(string) @@ -63,11 +63,9 @@ func (m *mimeTypeProcessor) Run(event *beat.Event) (*beat.Event, error) { return event, nil } if mimeType := mime.Detect(val); mimeType != "" { - event.Fields.DeepUpdate(common.MapStr{ - m.Target: mimeType, - }) + _, err = event.PutValue(m.Target, mimeType) } - return event, nil + return event, err } func (m *mimeTypeProcessor) String() string { diff --git a/libbeat/processors/actions/detect_mime_type_test.go b/libbeat/processors/actions/detect_mime_type_test.go index 51de6c9062f..1c8bcecbd0f 100644 --- a/libbeat/processors/actions/detect_mime_type_test.go +++ b/libbeat/processors/actions/detect_mime_type_test.go @@ -44,6 +44,28 @@ func TestMimeTypeFromTo(t *testing.T) { require.Equal(t, "text/plain; charset=utf-8", enriched) } +func TestMimeTypeFromToMetadata(t *testing.T) { + evt := beat.Event{ + Meta: common.MapStr{}, + Fields: common.MapStr{ + "foo.bar.baz": "hello world!", + }, + } + expectedMeta := common.MapStr{ + "field": "text/plain; charset=utf-8", + } + p, err := NewDetectMimeType(common.MustNewConfigFrom(map[string]interface{}{ + "field": "foo.bar.baz", + "target": "@metadata.field", + })) + require.NoError(t, err) + + observed, err := p.Run(&evt) + require.NoError(t, err) + require.Equal(t, expectedMeta, observed.Meta) + require.Equal(t, evt.Fields, observed.Fields) +} + func TestMimeTypeTestNoMatch(t *testing.T) { evt := beat.Event{ Fields: common.MapStr{ diff --git a/libbeat/processors/actions/docs/add_fields.asciidoc b/libbeat/processors/actions/docs/add_fields.asciidoc index 574c4a684c2..7023948afaf 100644 --- a/libbeat/processors/actions/docs/add_fields.asciidoc +++ b/libbeat/processors/actions/docs/add_fields.asciidoc @@ -6,14 +6,14 @@ ++++ The `add_fields` processor adds additional fields to the event. Fields can be -scalar values, arrays, dictionaries, or any nested combination of these. -The `add_fields` processor will overwrite the target field if it already exists. +scalar values, arrays, dictionaries, or any nested combination of these. +The `add_fields` processor will overwrite the target field if it already exists. By default the fields that you specify will be grouped under the `fields` sub-dictionary in the event. To group the fields under a different sub-dictionary, use the `target` setting. To store the fields as top-level fields, set `target: ''`. -`target`:: (Optional) Sub-dictionary to put all fields into. Defaults to `fields`. +`target`:: (Optional) Sub-dictionary to put all fields into. Defaults to `fields`. Setting this to `@metadata` will add values to the event metadata instead of fields. `fields`:: Fields to be added. @@ -40,3 +40,16 @@ Adds these fields to any event: } } ------------------------------------------------------------------------------- + +This configuration will alter the event metadata: + +[source,yaml] +------------------------------------------------------------------------------ +processors: + - add_fields: + target: '@metadata' + fields: + op_type: "index" +------------------------------------------------------------------------------ + +When the event is ingested (e.g. by Elastisearch) the document will have `op_type: "index"` set as a metadata field. diff --git a/libbeat/processors/actions/docs/add_tags.asciidoc b/libbeat/processors/actions/docs/add_tags.asciidoc index e338d246668..2965489839b 100644 --- a/libbeat/processors/actions/docs/add_tags.asciidoc +++ b/libbeat/processors/actions/docs/add_tags.asciidoc @@ -9,7 +9,7 @@ The `add_tags` processor adds tags to a list of tags. If the target field alread the tags are appended to the existing list of tags. `tags`:: List of tags to add. -`target`:: (Optional) Field the tags will be added to. Defaults to `tags`. +`target`:: (Optional) Field the tags will be added to. Defaults to `tags`. Setting tags in `@metadata` is not supported. For example, this configuration: diff --git a/libbeat/processors/actions/docs/copy_fields.asciidoc b/libbeat/processors/actions/docs/copy_fields.asciidoc index 07a56dbf149..98e7862f56d 100644 --- a/libbeat/processors/actions/docs/copy_fields.asciidoc +++ b/libbeat/processors/actions/docs/copy_fields.asciidoc @@ -7,7 +7,7 @@ The `copy_fields` processor copies a field to another one. -`fields`:: List of `from` and `to` pairs to copy from and to. +`fields`:: List of `from` and `to` pairs to copy from and to. It's supported to use `@metadata.` prefix for `from` and `to` and copy values not just in/from/to the event fields but also in/from/to the event metadata. `fail_on_error`:: (Optional) If set to `true` and an error occurs, the changes are reverted and the original is returned. If set to `false`, processing continues if an error occurs. Default is `true`. `ignore_missing`:: (Optional) Indicates whether to ignore events that lack the source diff --git a/libbeat/processors/actions/docs/detect_mime_type.asciidoc b/libbeat/processors/actions/docs/detect_mime_type.asciidoc index c93c6f882e9..d4532f3fe1f 100644 --- a/libbeat/processors/actions/docs/detect_mime_type.asciidoc +++ b/libbeat/processors/actions/docs/detect_mime_type.asciidoc @@ -7,7 +7,7 @@ The `detect_mime_type` processor attempts to detect a mime type for a field that contains a given stream of bytes. The `field` key contains the field used as -the data source and the `target` key contains the field to populate with the detected type +the data source and the `target` key contains the field to populate with the detected type. It's supported to use `@metadata.` prefix for `target` and set the value in the event metadata instead of fields. [source,yaml] ------- diff --git a/libbeat/processors/actions/docs/include_fields.asciidoc b/libbeat/processors/actions/docs/include_fields.asciidoc index 86b2c70e6ff..dee585dac90 100644 --- a/libbeat/processors/actions/docs/include_fields.asciidoc +++ b/libbeat/processors/actions/docs/include_fields.asciidoc @@ -7,7 +7,7 @@ The `include_fields` processor specifies which fields to export if a certain condition is fulfilled. The condition is optional. If it's missing, the -specified fields are always exported. The `@timestamp` and `type` fields are +specified fields are always exported. The `@timestamp`, `@metadata` and `type` fields are always exported, even if they are not defined in the `include_fields` list. [source,yaml] diff --git a/libbeat/processors/actions/docs/rename.asciidoc b/libbeat/processors/actions/docs/rename.asciidoc index c78a814a975..55d4738dbaa 100644 --- a/libbeat/processors/actions/docs/rename.asciidoc +++ b/libbeat/processors/actions/docs/rename.asciidoc @@ -8,7 +8,7 @@ The `rename` processor specifies a list of fields to rename. Under the `fields` key, each entry contains a `from: old-key` and a `to: new-key` pair, where: -* `from` is the original field name +* `from` is the original field name. It's supported to use `@metadata.` prefix for `from` and rename keys in the event metadata instead of event fields. * `to` is the target field name The `rename` processor cannot be used to overwrite fields. To overwrite fields diff --git a/libbeat/processors/actions/docs/replace.asciidoc b/libbeat/processors/actions/docs/replace.asciidoc index 2ad206b8e12..9193f4c103c 100644 --- a/libbeat/processors/actions/docs/replace.asciidoc +++ b/libbeat/processors/actions/docs/replace.asciidoc @@ -5,16 +5,16 @@ replace ++++ -The `replace` processor takes a list of fields to replace the field value -matching a pattern with replacement string. Under the `fields` key, each entry -contains a `field: field-name`, `pattern: regex-pattern` and +The `replace` processor takes a list of fields to replace the field value +matching a pattern with replacement string. Under the `fields` key, each entry +contains a `field: field-name`, `pattern: regex-pattern` and `replacement: replacement-string`, where: -* `field` is the original field name +* `field` is the original field name. It's supported to use `@metadata.` prefix for the fields and replace values in the event metadata instead of event fields. * `pattern` is the regex pattern to match the field's value * `replacement` is the replacement string to use to update the field's value -The `replace` processor cannot be used to replace value with a completely new value. +The `replace` processor cannot be used to replace value with a completely new value. TIP: The `replacement` field value can be used to truncate the `field` value or replace it with a new string. It can also be used for masking PII information. @@ -35,7 +35,7 @@ processors: The `replace` processor has following configuration settings: -`ignore_missing`:: (Optional) If set to `true`, no error is logged if the specified field +`ignore_missing`:: (Optional) If set to `true`, no error is logged if the specified field is missing. The default is `false`. `fail_on_error`:: (Optional) If set to `true` and there's an error, the replacement of diff --git a/libbeat/processors/actions/docs/truncate_fields.asciidoc b/libbeat/processors/actions/docs/truncate_fields.asciidoc index 667c5fb9884..d2e6c484710 100644 --- a/libbeat/processors/actions/docs/truncate_fields.asciidoc +++ b/libbeat/processors/actions/docs/truncate_fields.asciidoc @@ -8,7 +8,7 @@ The `truncate_fields` processor truncates a field to a given size. If the size of the field is smaller than the limit, the field is left as is. -`fields`:: List of fields to truncate. +`fields`:: List of fields to truncate. It's supported to use `@metadata.` prefix for the fields and truncate values in the event metadata instead of event fields. `max_bytes`:: Maximum number of bytes in a field. Mutually exclusive with `max_characters`. `max_characters`:: Maximum number of characters in a field. Mutually exclusive with `max_bytes`. `fail_on_error`:: (Optional) If set to true, in case of an error the changes to diff --git a/libbeat/processors/actions/drop_fields_test.go b/libbeat/processors/actions/drop_fields_test.go new file mode 100644 index 00000000000..8125564a6e1 --- /dev/null +++ b/libbeat/processors/actions/drop_fields_test.go @@ -0,0 +1,60 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestDropFieldRun(t *testing.T) { + event := &beat.Event{ + Fields: common.MapStr{ + "field": "value", + }, + Meta: common.MapStr{ + "meta_field": "value", + }, + } + + t.Run("supports a normal field", func(t *testing.T) { + p := dropFields{ + Fields: []string{"field"}, + } + + newEvent, err := p.Run(event) + assert.NoError(t, err) + assert.Equal(t, common.MapStr{}, newEvent.Fields) + assert.Equal(t, event.Meta, newEvent.Meta) + }) + + t.Run("supports a metadata field", func(t *testing.T) { + p := dropFields{ + Fields: []string{"@metadata.meta_field"}, + } + + newEvent, err := p.Run(event) + assert.NoError(t, err) + assert.Equal(t, common.MapStr{}, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) +} diff --git a/libbeat/processors/actions/extract_field_test.go b/libbeat/processors/actions/extract_field_test.go index be466a39da2..4e95b66fd28 100644 --- a/libbeat/processors/actions/extract_field_test.go +++ b/libbeat/processors/actions/extract_field_test.go @@ -106,6 +106,39 @@ func TestCommonPaths(t *testing.T) { // Event must be present, even on error assert.NotNil(t, event) } + + t.Run("supports a metadata field", func(t *testing.T) { + var config, _ = common.NewConfigFrom(map[string]interface{}{ + "field": "field", + "separator": "/", + "index": 3, + "target": "@metadata.field", + }) + + event := &beat.Event{ + Meta: common.MapStr{}, + Fields: common.MapStr{ + "field": "/var/lib/foo/bar", + }, + } + + expectedFields := common.MapStr{ + "field": "/var/lib/foo/bar", + } + expectedMeta := common.MapStr{ + "field": "bar", + } + + p, err := NewExtractField(config) + if err != nil { + t.Fatalf("error initializing extract_field: %s", err) + } + + newEvent, err := p.Run(event) + assert.NoError(t, err) + assert.Equal(t, expectedFields, newEvent.Fields) + assert.Equal(t, expectedMeta, newEvent.Meta) + }) } func runExtractField(t *testing.T, config *common.Config, input common.MapStr) (*beat.Event, error) { diff --git a/libbeat/processors/actions/rename.go b/libbeat/processors/actions/rename.go index 8be084a281b..170888d0b53 100644 --- a/libbeat/processors/actions/rename.go +++ b/libbeat/processors/actions/rename.go @@ -73,19 +73,19 @@ func NewRenameFields(c *common.Config) (processors.Processor, error) { } func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) { - var backup common.MapStr + var backup *beat.Event // Creates a copy of the event to revert in case of failure if f.config.FailOnError { - backup = event.Fields.Clone() + backup = event.Clone() } for _, field := range f.config.Fields { - err := f.renameField(field.From, field.To, event.Fields) + err := f.renameField(field.From, field.To, event) if err != nil { errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err) f.logger.Debug(errMsg.Error()) if f.config.FailOnError { - event.Fields = backup + event = backup event.PutValue("error.message", errMsg.Error()) return event, err } @@ -95,14 +95,14 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (f *renameFields) renameField(from string, to string, fields common.MapStr) error { +func (f *renameFields) renameField(from string, to string, event *beat.Event) error { // Fields cannot be overwritten. Either the target field has to be dropped first or renamed first - exists, _ := fields.HasKey(to) - if exists { + _, err := event.GetValue(to) + if err == nil { return fmt.Errorf("target field %s already exists, drop or rename this field first", to) } - value, err := fields.GetValue(from) + value, err := event.GetValue(from) if err != nil { // Ignore ErrKeyNotFound errors if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { @@ -112,12 +112,12 @@ func (f *renameFields) renameField(from string, to string, fields common.MapStr) } // Deletion must happen first to support cases where a becomes a.b - err = fields.Delete(from) + err = event.Delete(from) if err != nil { return fmt.Errorf("could not delete key: %s, %+v", from, err) } - _, err = fields.Put(to, value) + _, err = event.PutValue(to, value) if err != nil { return fmt.Errorf("could not put value: %s: %v, %v", to, value, err) } diff --git a/libbeat/processors/actions/rename_test.go b/libbeat/processors/actions/rename_test.go index 53c5a41d5bf..af9dfdd10c6 100644 --- a/libbeat/processors/actions/rename_test.go +++ b/libbeat/processors/actions/rename_test.go @@ -358,7 +358,7 @@ func TestRenameField(t *testing.T) { }, } - err := f.renameField(test.From, test.To, test.Input) + err := f.renameField(test.From, test.To, &beat.Event{Fields: test.Input}) if err != nil { assert.Equal(t, test.error, true) } @@ -366,4 +366,32 @@ func TestRenameField(t *testing.T) { assert.True(t, reflect.DeepEqual(test.Input, test.Output)) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + event := &beat.Event{ + Meta: common.MapStr{ + "a": "c", + }, + } + + expMeta := common.MapStr{ + "b": "c", + } + + f := &renameFields{ + config: renameFieldsConfig{ + Fields: []fromTo{ + { + From: "@metadata.a", + To: "@metadata.b", + }, + }, + }, + } + + newEvent, err := f.Run(event) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) } diff --git a/libbeat/processors/actions/replace.go b/libbeat/processors/actions/replace.go index 37245817050..9dfd84ba9a8 100644 --- a/libbeat/processors/actions/replace.go +++ b/libbeat/processors/actions/replace.go @@ -73,19 +73,19 @@ func NewReplaceString(c *common.Config) (processors.Processor, error) { } func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) { - var backup common.MapStr + var backup *beat.Event // Creates a copy of the event to revert in case of failure if f.config.FailOnError { - backup = event.Fields.Clone() + backup = event.Clone() } for _, field := range f.config.Fields { - err := f.replaceField(field.Field, field.Pattern, field.Replacement, event.Fields) + err := f.replaceField(field.Field, field.Pattern, field.Replacement, event) if err != nil { errMsg := fmt.Errorf("Failed to replace fields in processor: %s", err) logp.Debug("replace", errMsg.Error()) if f.config.FailOnError { - event.Fields = backup + event = backup event.PutValue("error.message", errMsg.Error()) return event, err } @@ -95,8 +95,8 @@ func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, replacement string, fields common.MapStr) error { - currentValue, err := fields.GetValue(field) +func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, replacement string, event *beat.Event) error { + currentValue, err := event.GetValue(field) if err != nil { // Ignore ErrKeyNotFound errors if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { @@ -106,7 +106,7 @@ func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, repla } updatedString := pattern.ReplaceAllString(currentValue.(string), replacement) - _, err = fields.Put(field, updatedString) + _, err = event.PutValue(field, updatedString) if err != nil { return fmt.Errorf("could not put value: %s: %v, %v", replacement, currentValue, err) } diff --git a/libbeat/processors/actions/replace_test.go b/libbeat/processors/actions/replace_test.go index aa95b4fd9ea..bf54d528583 100644 --- a/libbeat/processors/actions/replace_test.go +++ b/libbeat/processors/actions/replace_test.go @@ -237,7 +237,7 @@ func TestReplaceField(t *testing.T) { }, } - err := f.replaceField(test.Field, test.Pattern, test.Replacement, test.Input) + err := f.replaceField(test.Field, test.Pattern, test.Replacement, &beat.Event{Fields: test.Input}) if err != nil { assert.Equal(t, test.error, true) } @@ -245,4 +245,33 @@ func TestReplaceField(t *testing.T) { assert.True(t, reflect.DeepEqual(test.Input, test.Output)) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + event := &beat.Event{ + Meta: common.MapStr{ + "f": "abc", + }, + } + + expectedMeta := common.MapStr{ + "f": "bbc", + } + + f := &replaceString{ + config: replaceStringConfig{ + Fields: []replaceConfig{ + { + Field: "@metadata.f", + Pattern: regexp.MustCompile(`a`), + Replacement: "b", + }, + }, + }, + } + + newEvent, err := f.Run(event) + assert.NoError(t, err) + assert.Equal(t, expectedMeta, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) } diff --git a/libbeat/processors/actions/truncate_fields.go b/libbeat/processors/actions/truncate_fields.go index ead07724fab..227bbee10fc 100644 --- a/libbeat/processors/actions/truncate_fields.go +++ b/libbeat/processors/actions/truncate_fields.go @@ -82,9 +82,9 @@ func NewTruncateFields(c *common.Config) (processors.Processor, error) { } func (f *truncateFields) Run(event *beat.Event) (*beat.Event, error) { - var backup common.MapStr + var backup *beat.Event if f.config.FailOnError { - backup = event.Fields.Clone() + backup = event.Clone() } for _, field := range f.config.Fields { @@ -92,7 +92,7 @@ func (f *truncateFields) Run(event *beat.Event) (*beat.Event, error) { if err != nil { f.logger.Debugf("Failed to truncate fields: %s", err) if f.config.FailOnError { - event.Fields = backup + event = backup return event, err } } diff --git a/libbeat/processors/actions/truncate_fields_test.go b/libbeat/processors/actions/truncate_fields_test.go index c19bc4e27ff..aac7ee662cc 100644 --- a/libbeat/processors/actions/truncate_fields_test.go +++ b/libbeat/processors/actions/truncate_fields_test.go @@ -178,4 +178,39 @@ func TestTruncateFields(t *testing.T) { assert.Equal(t, test.Output, newEvent.Fields) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + p := truncateFields{ + config: truncateFieldsConfig{ + Fields: []string{"@metadata.message"}, + MaxBytes: 3, + FailOnError: true, + }, + truncate: (*truncateFields).truncateBytes, + logger: log, + } + + event := &beat.Event{ + Meta: common.MapStr{ + "message": "too long line", + }, + Fields: common.MapStr{}, + } + + expFields := common.MapStr{ + "log": common.MapStr{ + "flags": []string{"truncated"}, + }, + } + + expMeta := common.MapStr{ + "message": "too", + } + + newEvent, err := p.Run(event) + assert.NoError(t, err) + + assert.Equal(t, expFields, newEvent.Fields) + assert.Equal(t, expMeta, newEvent.Meta) + }) } diff --git a/libbeat/processors/add_id/add_id_test.go b/libbeat/processors/add_id/add_id_test.go index 54ad579e9fd..a50349f22f0 100644 --- a/libbeat/processors/add_id/add_id_test.go +++ b/libbeat/processors/add_id/add_id_test.go @@ -63,3 +63,25 @@ func TestNonDefaultTargetField(t *testing.T) { assert.NoError(t, err) assert.Empty(t, v) } + +func TestNonDefaultMetadataTarget(t *testing.T) { + cfg := common.MustNewConfigFrom(common.MapStr{ + "target_field": "@metadata.foo", + }) + p, err := New(cfg) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Meta: common.MapStr{}, + } + + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.Meta.GetValue("foo") + assert.NoError(t, err) + assert.NotEmpty(t, v) + + v, err = newEvent.GetValue("@metadata._id") + assert.Error(t, err) +} diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 2730770bc86..cdb182e4d31 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -254,6 +254,9 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi }) } +// Run runs the processor that adds a field `kubernetes` to the event fields that +// contains a map with various Kubernetes metadata. +// This processor does not access or modify the `Meta` of the event. func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { if !k.kubernetesAvailable { return event, nil diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 31b9fa9a9db..e6f745e6e91 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -159,10 +159,10 @@ func containsValue(m common.MapStr, v string) bool { return false } -// Run enriches the given event with the host meta data +// Run enriches the given event with the host meta data. func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) { for _, pidField := range p.config.MatchPIDs { - result, err := p.enrich(event.Fields, pidField) + result, err := p.enrich(event, pidField) if err != nil { switch err { case common.ErrKeyNotFound: @@ -174,7 +174,7 @@ func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) { } } if result != nil { - event.Fields = result + event = result } return event, nil } @@ -209,7 +209,7 @@ func pidToInt(value interface{}) (pid int, err error) { return pid, nil } -func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (result common.MapStr, err error) { +func (p *addProcessMetadata) enrich(event *beat.Event, pidField string) (result *beat.Event, err error) { pidIf, err := event.GetValue(pidField) if err != nil { return nil, err @@ -253,7 +253,7 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul return nil, errors.New("source is not a string") } if !p.config.OverwriteKeys { - if found, _ := result.HasKey(dest); found { + if _, err := result.GetValue(dest); err == nil { return nil, errors.Errorf("target field '%s' already exists and overwrite_keys is false", dest) } } @@ -264,7 +264,7 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul continue } - if _, err = result.Put(dest, value); err != nil { + if _, err = result.PutValue(dest, value); err != nil { return nil, err } } diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index 51f4a33172e..461a71efa3d 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -751,6 +751,42 @@ func TestAddProcessMetadata(t *testing.T) { } }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + c := common.MapStr{ + "match_pids": []string{"@metadata.system.ppid"}, + "target": "@metadata", + "include_fields": []string{"process.name"}, + } + + config, err := common.NewConfigFrom(c) + assert.NoError(t, err) + + proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true) + assert.NoError(t, err) + + event := &beat.Event{ + Meta: common.MapStr{ + "system": common.MapStr{ + "ppid": "1", + }, + }, + Fields: common.MapStr{}, + } + expMeta := common.MapStr{ + "system": common.MapStr{ + "ppid": "1", + }, + "process": common.MapStr{ + "name": "systemd", + }, + } + + result, err := proc.Run(event) + assert.NoError(t, err) + assert.Equal(t, expMeta, result.Meta) + assert.Equal(t, event.Fields, result.Fields) + }) } func TestUsingCache(t *testing.T) { diff --git a/libbeat/processors/communityid/communityid_test.go b/libbeat/processors/communityid/communityid_test.go index 2356b005cea..0ab643d05de 100644 --- a/libbeat/processors/communityid/communityid_test.go +++ b/libbeat/processors/communityid/communityid_test.go @@ -147,6 +147,26 @@ func TestRun(t *testing.T) { e.Put("network.iana_number", tcpProtocol) testProcessor(t, 0, e, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=") }) + + t.Run("supports metadata as a target", func(t *testing.T) { + event := &beat.Event{ + Fields: evt(), + Meta: common.MapStr{}, + } + c := defaultConfig() + c.Target = "@metadata.community_id" + c.Seed = 0 + p, err := newFromConfig(c) + assert.NoError(t, err) + + out, err := p.Run(event) + assert.NoError(t, err) + + id, err := out.Meta.GetValue("community_id") + assert.NoError(t, err) + + assert.EqualValues(t, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=", id) + }) } func testProcessor(t testing.TB, seed uint16, fields common.MapStr, expectedHash interface{}) { diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index b781f37ed4a..999a5abd418 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -80,19 +80,19 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { } // Backup original event. - saved := *event + saved := event + if len(p.Fields) > 1 && p.FailOnError { // Clone the fields to allow the processor to undo the operation on // failure (like a transaction). If there is only one conversion then // cloning is unnecessary because there are no previous changes to // rollback (so avoid the expensive clone operation). - saved.Fields = event.Fields.Clone() - saved.Meta = event.Meta.Clone() + saved = event.Clone() } // Update the event with the converted values. if err := p.writeToEvent(event, converted); err != nil { - return &saved, err + return saved, err } return event, nil diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go index f761b047f4c..ea57413562f 100644 --- a/libbeat/processors/convert/convert_test.go +++ b/libbeat/processors/convert/convert_test.go @@ -146,6 +146,30 @@ func TestConvert(t *testing.T) { `"Tag":"convert_ip","IgnoreMissing":false,"FailOnError":true,"Mode":"copy"}`, p.String()) }) + + t.Run("metadata as a target", func(t *testing.T) { + c := defaultConfig() + c.Tag = "convert_ip" + c.Fields = append(c.Fields, field{From: "@metadata.source", To: "@metadata.dest", Type: Integer}) + + evt := &beat.Event{ + Meta: common.MapStr{ + "source": "1", + }, + } + expMeta := common.MapStr{ + "source": "1", + "dest": int32(1), + } + + p, err := newConvert(c) + assert.NoError(t, err) + + newEvt, err := p.Run(evt) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvt.Meta) + assert.Equal(t, evt.Fields, newEvt.Fields) + }) } func TestConvertRun(t *testing.T) { diff --git a/libbeat/processors/decode_csv_fields/decode_csv_fields.go b/libbeat/processors/decode_csv_fields/decode_csv_fields.go index ff2cdf0938c..f971e7e8f5b 100644 --- a/libbeat/processors/decode_csv_fields/decode_csv_fields.go +++ b/libbeat/processors/decode_csv_fields/decode_csv_fields.go @@ -100,14 +100,13 @@ func NewDecodeCSVField(c *common.Config) (processors.Processor, error) { // Run applies the decode_csv_field processor to an event. func (f *decodeCSVFields) Run(event *beat.Event) (*beat.Event, error) { - saved := *event + var saved *beat.Event if f.FailOnError { - saved.Fields = event.Fields.Clone() - saved.Meta = event.Meta.Clone() + saved = event.Clone() } for src, dest := range f.fields { if err := f.decodeCSVField(src, dest, event); err != nil && f.FailOnError { - return &saved, err + return saved, err } } return event, nil diff --git a/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go b/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go index 8a406ca06e6..e85d91873cf 100644 --- a/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go +++ b/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go @@ -338,6 +338,37 @@ func TestDecodeCSVField(t *testing.T) { assert.NoError(t, err) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + config := common.MapStr{ + "fields": common.MapStr{ + "@metadata": common.MapStr{ + "field": "@metadata.message", + }, + }, + } + + event := &beat.Event{ + Meta: common.MapStr{ + "field": "17,192.168.33.1,8.8.8.8", + }, + Fields: common.MapStr{}, + } + expMeta := common.MapStr{ + "field": "17,192.168.33.1,8.8.8.8", + "message": []string{"17", "192.168.33.1", "8.8.8.8"}, + } + + processor, err := NewDecodeCSVField(common.MustNewConfigFrom(config)) + if err != nil { + t.Fatal(err) + } + result, err := processor.Run(event) + assert.NoError(t, err) + assert.Equal(t, expMeta, result.Meta) + assert.Equal(t, event.Fields, result.Fields) + }) + } func TestDecodeCSVField_String(t *testing.T) { diff --git a/libbeat/processors/decode_xml/decode_xml_test.go b/libbeat/processors/decode_xml/decode_xml_test.go index 83ef61ea226..5a86787ae46 100644 --- a/libbeat/processors/decode_xml/decode_xml_test.go +++ b/libbeat/processors/decode_xml/decode_xml_test.go @@ -402,6 +402,54 @@ func TestDecodeXML(t *testing.T) { assert.Equal(t, test.Output, newEvent.Fields) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + t.Parallel() + target := "@metadata.xml" + config := decodeXMLConfig{ + Field: "@metadata.message", + Target: &target, + } + + f, err := newDecodeXML(config) + require.NoError(t, err) + + event := &beat.Event{ + Meta: common.MapStr{ + "message": ` + + William H. Gaddis + The Recognitions + One of the great seminal American novels of the 20th century. + + `, + }, + } + expMeta := common.MapStr{ + "xml": common.MapStr{ + "catalog": map[string]interface{}{ + "book": map[string]interface{}{ + "author": "William H. Gaddis", + "review": "One of the great seminal American novels of the 20th century.", + "seq": "1", + "title": "The Recognitions", + }, + }, + }, + "message": ` + + William H. Gaddis + The Recognitions + One of the great seminal American novels of the 20th century. + + `, + } + + newEvent, err := f.Run(event) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) } func BenchmarkProcessor_Run(b *testing.B) { diff --git a/libbeat/processors/decode_xml_wineventlog/processor_test.go b/libbeat/processors/decode_xml_wineventlog/processor_test.go index 1aef817968f..4fc3c92d4e7 100644 --- a/libbeat/processors/decode_xml_wineventlog/processor_test.go +++ b/libbeat/processors/decode_xml_wineventlog/processor_test.go @@ -28,6 +28,68 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) +var testMessage = "" + + "4672001254800x8020000000000000" + + "11303Securityvagrant" + + "S-1-5-18SYSTEMNT AUTHORITY0x3e7" + + "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\t" + + "SeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege" + + "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\n" + + "Privileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\t" + + "SeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilegeInformation" + + "Special LogonInfoSecurityMicrosoft Windows security auditing.Audit Success" + +var testMessageOutput = common.MapStr{ + "event": common.MapStr{ + "action": "Special Logon", + "code": "4672", + "kind": "event", + "outcome": "success", + "provider": "Microsoft-Windows-Security-Auditing", + }, + "host": common.MapStr{ + "name": "vagrant", + }, + "log": common.MapStr{ + "level": "information", + }, + "winlog": common.MapStr{ + "channel": "Security", + "outcome": "success", + "activity_id": "{ffb23523-1f32-0000-c335-b2ff321fd701}", + "level": "information", + "event_id": "4672", + "provider_name": "Microsoft-Windows-Security-Auditing", + "record_id": uint64(11303), + "computer_name": "vagrant", + "time_created": func() time.Time { + t, _ := time.Parse(time.RFC3339Nano, "2021-03-23T09:56:13.137310000Z") + return t + }(), + "opcode": "Info", + "provider_guid": "{54849625-5478-4994-a5ba-3e3b0328c30d}", + "event_data": common.MapStr{ + "SubjectUserSid": "S-1-5-18", + "SubjectUserName": "SYSTEM", + "SubjectDomainName": "NT AUTHORITY", + "SubjectLogonId": "0x3e7", + "PrivilegeList": "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", + }, + "task": "Special Logon", + "keywords": []string{ + "Audit Success", + }, + "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", + "process": common.MapStr{ + "pid": uint32(652), + "thread": common.MapStr{ + "id": uint32(4660), + }, + }, + }, + "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", +} + func TestProcessor(t *testing.T) { var testCases = []struct { description string @@ -41,67 +103,9 @@ func TestProcessor(t *testing.T) { description: "Decodes properly with default config", config: defaultConfig(), Input: common.MapStr{ - "message": "" + - "4672001254800x8020000000000000" + - "11303Securityvagrant" + - "S-1-5-18SYSTEMNT AUTHORITY0x3e7" + - "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\t" + - "SeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege" + - "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\n" + - "Privileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\t" + - "SeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilegeInformation" + - "Special LogonInfoSecurityMicrosoft Windows security auditing.Audit Success", - }, - Output: common.MapStr{ - "event": common.MapStr{ - "action": "Special Logon", - "code": "4672", - "kind": "event", - "outcome": "success", - "provider": "Microsoft-Windows-Security-Auditing", - }, - "host": common.MapStr{ - "name": "vagrant", - }, - "log": common.MapStr{ - "level": "information", - }, - "winlog": common.MapStr{ - "channel": "Security", - "outcome": "success", - "activity_id": "{ffb23523-1f32-0000-c335-b2ff321fd701}", - "level": "information", - "event_id": "4672", - "provider_name": "Microsoft-Windows-Security-Auditing", - "record_id": uint64(11303), - "computer_name": "vagrant", - "time_created": func() time.Time { - t, _ := time.Parse(time.RFC3339Nano, "2021-03-23T09:56:13.137310000Z") - return t - }(), - "opcode": "Info", - "provider_guid": "{54849625-5478-4994-a5ba-3e3b0328c30d}", - "event_data": common.MapStr{ - "SubjectUserSid": "S-1-5-18", - "SubjectUserName": "SYSTEM", - "SubjectDomainName": "NT AUTHORITY", - "SubjectLogonId": "0x3e7", - "PrivilegeList": "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", - }, - "task": "Special Logon", - "keywords": []string{ - "Audit Success", - }, - "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", - "process": common.MapStr{ - "pid": uint32(652), - "thread": common.MapStr{ - "id": uint32(4660), - }, - }, - }, - "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", + "message": testMessage, }, + Output: testMessageOutput, }, { description: "Decodes without ECS", @@ -112,62 +116,11 @@ func TestProcessor(t *testing.T) { Target: "winlog", }, Input: common.MapStr{ - "message": "" + - "4672001254800x8020000000000000" + - "11303Securityvagrant" + - "S-1-5-18SYSTEMNT AUTHORITY0x3e7" + - "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\t" + - "SeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege" + - "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\n" + - "Privileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\t" + - "SeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilegeInformation" + - "Special LogonInfoSecurityMicrosoft Windows security auditing.Audit Success", + "message": testMessage, }, Output: common.MapStr{ - "winlog": common.MapStr{ - "channel": "Security", - "outcome": "success", - "activity_id": "{ffb23523-1f32-0000-c335-b2ff321fd701}", - "level": "information", - "event_id": "4672", - "provider_name": "Microsoft-Windows-Security-Auditing", - "record_id": uint64(11303), - "computer_name": "vagrant", - "time_created": func() time.Time { - t, _ := time.Parse(time.RFC3339Nano, "2021-03-23T09:56:13.137310000Z") - return t - }(), - "opcode": "Info", - "provider_guid": "{54849625-5478-4994-a5ba-3e3b0328c30d}", - "event_data": common.MapStr{ - "SubjectUserSid": "S-1-5-18", - "SubjectUserName": "SYSTEM", - "SubjectDomainName": "NT AUTHORITY", - "SubjectLogonId": "0x3e7", - "PrivilegeList": "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", - }, - "task": "Special Logon", - "keywords": []string{ - "Audit Success", - }, - "message": "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", - "process": common.MapStr{ - "pid": uint32(652), - "thread": common.MapStr{ - "id": uint32(4660), - }, - }, - }, - "message": "" + - "4672001254800x8020000000000000" + - "11303Securityvagrant" + - "S-1-5-18SYSTEMNT AUTHORITY0x3e7" + - "SeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\t" + - "SeRestorePrivilege\n\t\t\tSeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilege" + - "Special privileges assigned to new logon.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\n" + - "Privileges:\t\tSeAssignPrimaryTokenPrivilege\n\t\t\tSeTcbPrivilege\n\t\t\tSeSecurityPrivilege\n\t\t\tSeTakeOwnershipPrivilege\n\t\t\tSeLoadDriverPrivilege\n\t\t\tSeBackupPrivilege\n\t\t\tSeRestorePrivilege\n\t\t\t" + - "SeDebugPrivilege\n\t\t\tSeAuditPrivilege\n\t\t\tSeSystemEnvironmentPrivilege\n\t\t\tSeImpersonatePrivilege\n\t\t\tSeDelegateSessionUserImpersonatePrivilegeInformation" + - "Special LogonInfoSecurityMicrosoft Windows security auditing.Audit Success", + "winlog": testMessageOutput["winlog"], + "message": testMessage, }, }, } @@ -194,4 +147,31 @@ func TestProcessor(t *testing.T) { assert.Equal(t, test.Output, newEvent.Fields) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + t.Parallel() + + config := defaultConfig() + config.Field = "@metadata.message" + config.Target = "@metadata.target" + + f, err := newProcessor(config) + require.NoError(t, err) + + event := &beat.Event{ + Fields: common.MapStr{}, + Meta: common.MapStr{ + "message": testMessage, + }, + } + + expMeta := common.MapStr{ + "message": testMessage, + "target": testMessageOutput["winlog"], + } + newEvent, err := f.Run(event) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) } diff --git a/libbeat/processors/dissect/processor.go b/libbeat/processors/dissect/processor.go index b3e8ac9f635..5b9edd3d7c2 100644 --- a/libbeat/processors/dissect/processor.go +++ b/libbeat/processors/dissect/processor.go @@ -104,21 +104,21 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { return event, err } + backup := event.Clone() + if convertDataType { event, err = p.mapper(event, mapInterfaceToMapStr(mc)) } else { event, err = p.mapper(event, mapToMapStr(m)) } if err != nil { - return event, err + return backup, err } return event, nil } func (p *processor) mapper(event *beat.Event, m common.MapStr) (*beat.Event, error) { - copy := event.Fields.Clone() - prefix := "" if p.config.TargetPrefix != "" { prefix = p.config.TargetPrefix + "." @@ -129,7 +129,6 @@ func (p *processor) mapper(event *beat.Event, m common.MapStr) (*beat.Event, err if _, err := event.GetValue(prefixKey); err == common.ErrKeyNotFound || p.config.OverwriteKeys { event.PutValue(prefixKey, v) } else { - event.Fields = copy // When the target key exists but is a string instead of a map. if err != nil { return event, errors.Wrapf(err, "cannot override existing key with `%s`", prefixKey) diff --git a/libbeat/processors/dissect/processor_test.go b/libbeat/processors/dissect/processor_test.go index 5a3d0217021..f46aa8bd78f 100644 --- a/libbeat/processors/dissect/processor_test.go +++ b/libbeat/processors/dissect/processor_test.go @@ -152,6 +152,34 @@ func TestProcessor(t *testing.T) { } }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + e := &beat.Event{ + Meta: common.MapStr{ + "message": "hello world", + }, + } + expMeta := common.MapStr{ + "message": "hello world", + "key": "world", + } + + c := map[string]interface{}{ + "tokenizer": "hello %{key}", + "field": "@metadata.message", + "target_prefix": "@metadata", + } + cfg, err := common.NewConfigFrom(c) + assert.NoError(t, err) + + processor, err := NewProcessor(cfg) + assert.NoError(t, err) + + newEvent, err := processor.Run(e) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvent.Meta) + assert.Equal(t, e.Fields, newEvent.Fields) + }) } func TestFieldDoesntExist(t *testing.T) { diff --git a/libbeat/processors/dns/dns_test.go b/libbeat/processors/dns/dns_test.go index 2b26ff50081..32f95c7b810 100644 --- a/libbeat/processors/dns/dns_test.go +++ b/libbeat/processors/dns/dns_test.go @@ -92,6 +92,36 @@ func TestDNSProcessorRun(t *testing.T) { v, _ := event.GetValue("source.domain") assert.Equal(t, gatewayName, v) }) + + t.Run("metadata target", func(t *testing.T) { + config := defaultConfig + config.reverseFlat = map[string]string{ + "@metadata.ip": "@metadata.domain", + } + + p := &processor{ + Config: config, + resolver: &stubResolver{}, + log: logp.NewLogger(logName), + } + + event := &beat.Event{ + Meta: common.MapStr{ + "ip": gatewayIP, + }, + } + + expMeta := common.MapStr{ + "ip": gatewayIP, + "domain": gatewayName, + } + + newEvent, err := p.Run(event) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) + } func TestDNSProcessorTagOnFailure(t *testing.T) { diff --git a/libbeat/processors/extract_array/extract_array.go b/libbeat/processors/extract_array/extract_array.go index 1c9fc05da93..4e833cc2851 100644 --- a/libbeat/processors/extract_array/extract_array.go +++ b/libbeat/processors/extract_array/extract_array.go @@ -129,10 +129,9 @@ func (f *extractArrayProcessor) Run(event *beat.Event) (*beat.Event, error) { return event, errors.Wrapf(err, "unsupported type for field %s: got: %s needed: array", f.config.Field, t.String()) } - saved := *event + saved := event if f.config.FailOnError { - saved.Fields = event.Fields.Clone() - saved.Meta = event.Meta.Clone() + saved = event.Clone() } n := array.Len() @@ -141,7 +140,7 @@ func (f *extractArrayProcessor) Run(event *beat.Event) (*beat.Event, error) { if !f.config.FailOnError { continue } - return &saved, errors.Errorf("index %d exceeds length of %d when processing mapping for field %s", mapping.from, n, mapping.to) + return saved, errors.Errorf("index %d exceeds length of %d when processing mapping for field %s", mapping.from, n, mapping.to) } cell := array.Index(mapping.from) // checking for CanInterface() here is done to prevent .Interface() from @@ -155,14 +154,14 @@ func (f *extractArrayProcessor) Run(event *beat.Event) (*beat.Event, error) { if !f.config.FailOnError { continue } - return &saved, errors.Errorf("target field %s already has a value. Set the overwrite_keys flag or drop/rename the field first", mapping.to) + return saved, errors.Errorf("target field %s already has a value. Set the overwrite_keys flag or drop/rename the field first", mapping.to) } } if _, err = event.PutValue(mapping.to, clone(cell.Interface())); err != nil { if !f.config.FailOnError { continue } - return &saved, errors.Wrapf(err, "failed setting field %s", mapping.to) + return saved, errors.Wrapf(err, "failed setting field %s", mapping.to) } } return event, nil diff --git a/libbeat/processors/extract_array/extract_array_test.go b/libbeat/processors/extract_array/extract_array_test.go index 1707e317530..b3d2aa63393 100644 --- a/libbeat/processors/extract_array/extract_array_test.go +++ b/libbeat/processors/extract_array/extract_array_test.go @@ -267,4 +267,38 @@ func TestExtractArrayProcessor_Run(t *testing.T) { t.Log(result) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + + config := common.MapStr{ + "field": "@metadata.array", + "mappings": common.MapStr{ + "@metadata.first": 1, + "@metadata.second": 2, + }, + } + + event := &beat.Event{ + Meta: common.MapStr{ + "array": []interface{}{"zero", 1, common.MapStr{"two": 2}}, + }, + } + + expMeta := common.MapStr{ + "array": []interface{}{"zero", 1, common.MapStr{"two": 2}}, + "first": 1, + "second": common.MapStr{"two": 2}, + } + + cfg := common.MustNewConfigFrom(config) + processor, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := processor.Run(event) + assert.NoError(t, err) + + assert.Equal(t, expMeta, result.Meta) + assert.Equal(t, event.Fields, result.Fields) + }) } diff --git a/libbeat/processors/fingerprint/fingerprint_test.go b/libbeat/processors/fingerprint/fingerprint_test.go index 044c57a2eed..8ee7c848d83 100644 --- a/libbeat/processors/fingerprint/fingerprint_test.go +++ b/libbeat/processors/fingerprint/fingerprint_test.go @@ -81,6 +81,31 @@ func TestWithConfig(t *testing.T) { assert.Equal(t, test.want, v) }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + config := common.MustNewConfigFrom(common.MapStr{ + "fields": []string{"@metadata.message"}, + "target_field": "@metadata.fingerprint", + }) + p, err := New(config) + require.NoError(t, err) + + testEvent := &beat.Event{ + Timestamp: time.Unix(1635443183, 0), + Meta: common.MapStr{ + "message": "hello world", + }, + } + + expMeta := common.MapStr{ + "message": "hello world", + "fingerprint": "1a3fe8251076ed8de5fd99ce529d2b9971c54851d4d45f5a576bed91d0cc4202", + } + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvent.Meta) + assert.Equal(t, testEvent.Fields, newEvent.Fields) + }) } func TestHashMethods(t *testing.T) { diff --git a/libbeat/processors/registered_domain/registered_domain_test.go b/libbeat/processors/registered_domain/registered_domain_test.go index 085a3eb787a..0b99e5ebdab 100644 --- a/libbeat/processors/registered_domain/registered_domain_test.go +++ b/libbeat/processors/registered_domain/registered_domain_test.go @@ -93,4 +93,30 @@ func TestProcessorRun(t *testing.T) { assert.Equal(t, tc.ETLD, etld) } } + + t.Run("support metadata as a target", func(t *testing.T) { + c := defaultConfig() + c.Field = "@metadata.domain" + c.TargetField = "@metadata.registered_domain" + c.TargetSubdomainField = "@metadata.subdomain" + c.TargetETLDField = "@metadata.etld" + p, err := newRegisteredDomain(c) + + evt := &beat.Event{ + Meta: common.MapStr{ + "domain": "www.google.com", + }, + } + expMeta := common.MapStr{ + "domain": "www.google.com", + "registered_domain": "google.com", + "subdomain": "www", + "etld": "com", + } + + newEvt, err := p.Run(evt) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvt.Meta) + assert.Equal(t, evt.Fields, newEvt.Fields) + }) } diff --git a/libbeat/processors/timestamp/timestamp_test.go b/libbeat/processors/timestamp/timestamp_test.go index 466db497cfd..54b7d8b9e95 100644 --- a/libbeat/processors/timestamp/timestamp_test.go +++ b/libbeat/processors/timestamp/timestamp_test.go @@ -302,3 +302,38 @@ func TestTimezone(t *testing.T) { }) } } + +func TestMetadataTarget(t *testing.T) { + datetime := "2006-01-02T15:04:05Z" + c := defaultConfig() + c.Field = "@metadata.time" + c.TargetField = "@metadata.ts" + c.Layouts = append(c.Layouts, time.RFC3339) + c.Timezone = cfgtype.MustNewTimezone("EST") + + p, err := newFromConfig(c) + if err != nil { + t.Fatal(err) + } + + evt := &beat.Event{ + Meta: common.MapStr{ + "time": datetime, + }, + } + + newEvt, err := p.Run(evt) + assert.NoError(t, err) + + expTs, err := time.Parse(time.RFC3339, datetime) + assert.NoError(t, err) + + expMeta := common.MapStr{ + "time": datetime, + "ts": expTs.UTC(), + } + + assert.Equal(t, expMeta, newEvt.Meta) + assert.Equal(t, evt.Fields, newEvt.Fields) + assert.Equal(t, evt.Timestamp, newEvt.Timestamp) +} diff --git a/libbeat/processors/translate_sid/translatesid_test.go b/libbeat/processors/translate_sid/translatesid_test.go index 267e726250d..3f27b6569ce 100644 --- a/libbeat/processors/translate_sid/translatesid_test.go +++ b/libbeat/processors/translate_sid/translatesid_test.go @@ -30,6 +30,7 @@ import ( "golang.org/x/sys/windows" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/winlogbeat/sys/winevent" ) @@ -83,6 +84,33 @@ func TestTranslateSID(t *testing.T) { } }) } + + t.Run("supports metadata as a target", func(t *testing.T) { + p, err := newFromConfig(config{ + Field: "@metadata.sid", + DomainTarget: "@metadata.domain", + AccountNameTarget: "@metadata.account", + AccountTypeTarget: "@metadata.type", + }) + assert.NoError(t, err) + evt := &beat.Event{ + Meta: common.MapStr{ + "sid": "S-1-5-7", + }, + } + + expMeta := common.MapStr{ + "sid": "S-1-5-32-544", + "domain": "BUILTIN", + "account": "Administrators", + "type": winevent.SidTypeAlias, + } + + newEvt, err := p.Run(evt) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvt.Meta) + assert.Equal(t, evt.Fields, newEvt.Fields) + }) } func TestTranslateSIDEmptyTarget(t *testing.T) { diff --git a/libbeat/processors/urldecode/urldecode.go b/libbeat/processors/urldecode/urldecode.go index 6fc9cb8386d..110b6387a2d 100644 --- a/libbeat/processors/urldecode/urldecode.go +++ b/libbeat/processors/urldecode/urldecode.go @@ -73,9 +73,9 @@ func New(c *common.Config) (processors.Processor, error) { } func (p *urlDecode) Run(event *beat.Event) (*beat.Event, error) { - var backup common.MapStr + var backup *beat.Event if p.config.FailOnError { - backup = event.Fields.Clone() + backup = event.Clone() } for _, field := range p.config.Fields { @@ -84,7 +84,7 @@ func (p *urlDecode) Run(event *beat.Event) (*beat.Event, error) { errMsg := fmt.Errorf("failed to decode fields in urldecode processor: %v", err) p.log.Debug(errMsg.Error()) if p.config.FailOnError { - event.Fields = backup + event = backup event.PutValue("error.message", errMsg.Error()) return event, err } diff --git a/libbeat/processors/urldecode/urldecode_test.go b/libbeat/processors/urldecode/urldecode_test.go index 8d962bd9470..7e32aad3a56 100644 --- a/libbeat/processors/urldecode/urldecode_test.go +++ b/libbeat/processors/urldecode/urldecode_test.go @@ -211,4 +211,32 @@ func TestURLDecode(t *testing.T) { }) } + t.Run("supports metadata as a target", func(t *testing.T) { + t.Parallel() + + config := urlDecodeConfig{ + Fields: []fromTo{{ + From: "@metadata.field", To: "@metadata.target", + }}, + } + + f := &urlDecode{ + log: logp.NewLogger("urldecode"), + config: config, + } + + event := &beat.Event{ + Meta: common.MapStr{ + "field": "correct%20data", + }, + } + expMeta := common.MapStr{ + "field": "correct%20data", + "target": "correct data", + } + newEvent, err := f.Run(event) + assert.NoError(t, err) + assert.Equal(t, expMeta, newEvent.Meta) + assert.Equal(t, event.Fields, newEvent.Fields) + }) }