Skip to content

Commit

Permalink
Add metadata change support for processors (#30183)
Browse files Browse the repository at this point in the history
Some of the processors that are configured with a target field didn't
support special cases with the target field is a `@timestamp` or a
`@metadata` sub-field. This change is to make it consistent across
processors or to document why the processor does not need this change.

Also added tests for each processor that is supposed to support metadata as a target.
  • Loading branch information
rdner authored Feb 4, 2022
1 parent 8c79b67 commit ba6d176
Show file tree
Hide file tree
Showing 50 changed files with 987 additions and 198 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- The extension of the log files of Beats and Elastic Agent is changed to `.ndjson`. If you are collecting the logs, you must change the path configuration to `/path/to/logs/{beatname}*.ndjson` to avoid any issues. {pull}28927[28927]
- Remove legacy support for SSLv3. {pull}30071[30071]
- `add_fields` processor is now able to set metadata in events {pull}30092[30092]
- Add metadata change support for some processors {pull}30183[30183]

*Auditbeat*

Expand Down
11 changes: 11 additions & 0 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ func (e *Event) GetValue(key string) (interface{}, error) {
return e.Fields.GetValue(key)
}

// Clone creates an exact copy of the event
func (e *Event) Clone() *Event {
return &Event{
Timestamp: e.Timestamp,
Meta: e.Meta.Clone(),
Fields: e.Fields.Clone(),
Private: e.Private,
TimeSeries: e.TimeSeries,
}
}

// DeepUpdate recursively copies the key-value pairs from `d` to various properties of the event.
// When the key equals `@timestamp` it's set as the `Timestamp` property of the event.
// When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields`
Expand Down
26 changes: 26 additions & 0 deletions libbeat/processors/actions/add_network_direction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,30 @@ func TestNetworkDirection(t *testing.T) {
}
})
}

t.Run("supports metadata as a target", func(t *testing.T) {
evt := beat.Event{
Meta: common.MapStr{},
Fields: common.MapStr{
"source": "1.1.1.1",
"destination": "8.8.8.8",
},
}
p, err := NewAddNetworkDirection(common.MustNewConfigFrom(map[string]interface{}{
"source": "source",
"destination": "destination",
"target": "@metadata.direction",
"internal_networks": "private",
}))
require.NoError(t, err)

expectedMeta := common.MapStr{
"direction": "external",
}

observed, err := p.Run(&evt)
require.NoError(t, err)
require.Equal(t, expectedMeta, observed.Meta)
require.Equal(t, evt.Fields, observed.Fields)
})
}
18 changes: 9 additions & 9 deletions libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,18 @@ func NewCopyFields(c *common.Config) (processors.Processor, error) {
}

func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
var backup common.MapStr
var backup *beat.Event
if f.config.FailOnError {
backup = event.Fields.Clone()
backup = event.Clone()
}

for _, field := range f.config.Fields {
err := f.copyField(field.From, field.To, event.Fields)
err := f.copyField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err)
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
Expand All @@ -90,21 +90,21 @@ func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

func (f *copyFields) copyField(from string, to string, fields common.MapStr) error {
exists, _ := fields.HasKey(to)
if exists {
func (f *copyFields) copyField(from string, to string, event *beat.Event) error {
_, err := event.GetValue(to)
if err == nil {
return fmt.Errorf("target field %s already exists, drop or rename this field first", to)
}

value, err := fields.GetValue(from)
value, err := event.GetValue(from)
if err != nil {
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return nil
}
return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err)
}

_, err = fields.Put(to, cloneValue(value))
_, err = event.PutValue(to, cloneValue(value))
if err != nil {
return fmt.Errorf("could not copy value to %s: %v, %+v", to, value, err)
}
Expand Down
31 changes: 31 additions & 0 deletions libbeat/processors/actions/copy_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,35 @@ func TestCopyFields(t *testing.T) {
assert.Equal(t, test.Expected, newEvent.Fields)
})
}

t.Run("supports metadata as a target", func(t *testing.T) {
p := copyFields{
copyFieldsConfig{
Fields: []fromTo{
{
From: "@metadata.message",
To: "@metadata.message_copied",
},
},
},
log,
}

event := &beat.Event{
Meta: common.MapStr{
"message": "please copy this line",
},
}

expMeta := common.MapStr{
"message": "please copy this line",
"message_copied": "please copy this line",
}

newEvent, err := p.Run(event)
assert.NoError(t, err)

assert.Equal(t, expMeta, newEvent.Meta)
assert.Equal(t, event.Fields, newEvent.Fields)
})
}
6 changes: 3 additions & 3 deletions libbeat/processors/actions/decode_base64_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,18 @@ func NewDecodeBase64Field(c *common.Config) (processors.Processor, error) {
}

func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) {
var backup common.MapStr
var backup *beat.Event
// Creates a copy of the event to revert in case of failure
if f.config.FailOnError {
backup = event.Fields.Clone()
backup = event.Clone()
}

err := f.decodeField(event)
if err != nil {
errMsg := fmt.Errorf("failed to decode base64 fields in processor: %v", err)
f.log.Debug(errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
Expand Down
33 changes: 33 additions & 0 deletions libbeat/processors/actions/decode_base64_field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,37 @@ func TestDecodeBase64Run(t *testing.T) {
assert.Equal(t, test.Output, newEvent.Fields)
})
}

t.Run("supports a metadata field", func(t *testing.T) {
config := base64Config{
Field: fromTo{
From: "field1",
To: "@metadata.field",
},
}

event := &beat.Event{
Meta: common.MapStr{},
Fields: common.MapStr{
"field1": "Y29ycmVjdCBkYXRh",
},
}

f := &decodeBase64Field{
log: logp.NewLogger(processorName),
config: config,
}

expectedFields := common.MapStr{
"field1": "Y29ycmVjdCBkYXRh",
}
expectedMeta := common.MapStr{
"field": "correct data",
}

newEvent, err := f.Run(event)
assert.NoError(t, err)
assert.Equal(t, expectedFields, newEvent.Fields)
assert.Equal(t, expectedMeta, newEvent.Meta)
})
}
40 changes: 40 additions & 0 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,46 @@ func TestTargetRootOption(t *testing.T) {
assert.Equal(t, expected.String(), actual.String())
}

func TestTargetMetadata(t *testing.T) {
event := &beat.Event{
Fields: common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
},
Meta: common.MapStr{},
}

testConfig, _ = common.NewConfigFrom(map[string]interface{}{
"fields": fields,
"process_array": false,
"max_depth": 2,
"target": "@metadata.json",
})

log := logp.NewLogger("decode_json_fields_test")

p, err := NewDecodeJSONFields(testConfig)
if err != nil {
log.Error("Error initializing decode_json_fields")
t.Fatal(err)
}

actual, _ := p.Run(event)

expectedMeta := common.MapStr{
"json": map[string]interface{}{
"log": map[string]interface{}{
"level": "info",
},
"stream": "stderr",
"count": int64(3),
},
}

assert.Equal(t, expectedMeta, actual.Meta)
assert.Equal(t, event.Fields, actual.Fields)
}

func TestNotJsonObjectOrArray(t *testing.T) {
var cases = []struct {
MaxDepth int
Expand Down
6 changes: 3 additions & 3 deletions libbeat/processors/actions/decompress_gzip_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ func NewDecompressGzipFields(c *common.Config) (processors.Processor, error) {

// Run applies the decompress_gzip_fields processor to an event.
func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) {
var backup common.MapStr
var backup *beat.Event
if f.config.FailOnError {
backup = event.Fields.Clone()
backup = event.Clone()
}

err := f.decompressGzipField(event)
if err != nil {
errMsg := fmt.Errorf("Failed to decompress field in decompress_gzip_field processor: %v", err)
f.log.Debug(errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
Expand Down
34 changes: 34 additions & 0 deletions libbeat/processors/actions/decompress_gzip_field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,38 @@ func TestDecompressGzip(t *testing.T) {
assert.Equal(t, test.output, newEvent.Fields)
})
}

t.Run("supports metadata as a target", func(t *testing.T) {
t.Parallel()

event := &beat.Event{
Fields: common.MapStr{
"field1": []byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0},
},
Meta: common.MapStr{},
}

expectedMeta := common.MapStr{
"field": "decompressed data",
}

config := decompressGzipFieldConfig{
Field: fromTo{
From: "field1", To: "@metadata.field",
},
IgnoreMissing: false,
FailOnError: true,
}

f := &decompressGzipField{
log: logp.NewLogger("decompress_gzip_field"),
config: config,
}

newEvent, err := f.Run(event)
assert.NoError(t, err)

assert.Equal(t, expectedMeta, newEvent.Meta)
assert.Equal(t, event.Fields, newEvent.Fields)
})
}
8 changes: 3 additions & 5 deletions libbeat/processors/actions/detect_mime_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewDetectMimeType(cfg *common.Config) (processors.Processor, error) {
func (m *mimeTypeProcessor) Run(event *beat.Event) (*beat.Event, error) {
valI, err := event.GetValue(m.Field)
if err != nil {
// doesn't have the required fieldd value to analyze
// doesn't have the required field value to analyze
return event, nil
}
val, _ := valI.(string)
Expand All @@ -63,11 +63,9 @@ func (m *mimeTypeProcessor) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}
if mimeType := mime.Detect(val); mimeType != "" {
event.Fields.DeepUpdate(common.MapStr{
m.Target: mimeType,
})
_, err = event.PutValue(m.Target, mimeType)
}
return event, nil
return event, err
}

func (m *mimeTypeProcessor) String() string {
Expand Down
22 changes: 22 additions & 0 deletions libbeat/processors/actions/detect_mime_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ func TestMimeTypeFromTo(t *testing.T) {
require.Equal(t, "text/plain; charset=utf-8", enriched)
}

func TestMimeTypeFromToMetadata(t *testing.T) {
evt := beat.Event{
Meta: common.MapStr{},
Fields: common.MapStr{
"foo.bar.baz": "hello world!",
},
}
expectedMeta := common.MapStr{
"field": "text/plain; charset=utf-8",
}
p, err := NewDetectMimeType(common.MustNewConfigFrom(map[string]interface{}{
"field": "foo.bar.baz",
"target": "@metadata.field",
}))
require.NoError(t, err)

observed, err := p.Run(&evt)
require.NoError(t, err)
require.Equal(t, expectedMeta, observed.Meta)
require.Equal(t, evt.Fields, observed.Fields)
}

func TestMimeTypeTestNoMatch(t *testing.T) {
evt := beat.Event{
Fields: common.MapStr{
Expand Down
19 changes: 16 additions & 3 deletions libbeat/processors/actions/docs/add_fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
++++

The `add_fields` processor adds additional fields to the event. Fields can be
scalar values, arrays, dictionaries, or any nested combination of these.
The `add_fields` processor will overwrite the target field if it already exists.
scalar values, arrays, dictionaries, or any nested combination of these.
The `add_fields` processor will overwrite the target field if it already exists.
By default the fields that you specify will be grouped under the `fields`
sub-dictionary in the event. To group the fields under a different
sub-dictionary, use the `target` setting. To store the fields as
top-level fields, set `target: ''`.

`target`:: (Optional) Sub-dictionary to put all fields into. Defaults to `fields`.
`target`:: (Optional) Sub-dictionary to put all fields into. Defaults to `fields`. Setting this to `@metadata` will add values to the event metadata instead of fields.
`fields`:: Fields to be added.


Expand All @@ -40,3 +40,16 @@ Adds these fields to any event:
}
}
-------------------------------------------------------------------------------

This configuration will alter the event metadata:

[source,yaml]
------------------------------------------------------------------------------
processors:
- add_fields:
target: '@metadata'
fields:
op_type: "index"
------------------------------------------------------------------------------

When the event is ingested (e.g. by Elastisearch) the document will have `op_type: "index"` set as a metadata field.
Loading

0 comments on commit ba6d176

Please sign in to comment.