Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log.flags and object metadata to aws-s3 input events #26267

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add new `parser` to `filestream` input: `container`. {pull}26115[26115]
- Add support for ISO8601 timestamps in Zeek fileset {pull}25564[25564]
- Add `preserve_original_event` option to `o365audit` input. {pull}26273[26273]
- Add `log.flags` to events created by the `aws-s3` input. {pull}26267[26267]
- Add `include_s3_metadata` config option to the `aws-s3` input for including object metadata in events. {pull}26267[26267]

*Heartbeat*

Expand Down
23 changes: 21 additions & 2 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -125697,7 +125697,7 @@ S3 fields from s3 input.



*`bucket_name`*::
*`bucket.name`*::
+
--
Name of the S3 bucket that this log retrieved from.
Expand All @@ -125707,7 +125707,17 @@ type: keyword

--

*`object_key`*::
*`bucket.arn`*::
+
--
ARN of the S3 bucket that this log retrieved from.


type: keyword

--

*`object.key`*::
+
--
Name of the S3 object that this log retrieved from.
Expand All @@ -125717,6 +125727,15 @@ type: keyword

--

*`metadata`*::
+
--
AWS S3 object metadata values.

type: flattened

--

[[exported-fields-santa]]
== Google Santa fields

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,6 @@
# The duration (in seconds) that the received messages are hidden from subsequent
# retrieve requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300

# List of S3 object metadata keys to include in events.
#include_s3_metadata: []
23 changes: 20 additions & 3 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones
specified in the `file_selectors`. Regex syntax is the same as the Go
language. Files that don't match one of the regexes won't be
processed. <<input-aws-s3-content_type>>, <<input-aws-s3-multiline>>,
<<input-aws-s3-max_bytes>>, <<input-aws-s3-buffer_size>>, and
<<input-aws-s3-encoding>> may also be set for each file selector.
processed. <<input-aws-s3-content_type>>, <<input-aws-s3-include_s3_metadata>>,
<<input-aws-s3-multiline>>, <<input-aws-s3-max_bytes>>,
<<input-aws-s3-buffer_size>>, and <<input-aws-s3-encoding>> may also be set for
each file selector.

["source", "yml"]
----
Expand All @@ -133,6 +134,22 @@ Enabling this option changes the service name from `s3` to `s3-fips` for
connecting to the correct service endpoint. For example:
`s3-fips.us-gov-east-1.amazonaws.com`.

[id="input-{type}-include_s3_metadata"]
[float]
==== `include_s3_metadata`

This input can include S3 object metadata in the generated events for use in
follow-on processing. You must specify the list of keys to include. By default
none are included. If the key exists in the S3 response then it will be included
in the event as `aws.s3.metadata.<key>` where the key name as been normalized
to all lowercase.

----
include_s3_metadata:
- last-modified
- x-amz-version-id
----

[id="input-{type}-max_bytes"]
[float]
==== `max_bytes`
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3056,6 +3056,9 @@ filebeat.inputs:
# retrieve requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300

# List of S3 object metadata keys to include in events.
#include_s3_metadata: []

# =========================== Filebeat autodiscover ============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
12 changes: 10 additions & 2 deletions x-pack/filebeat/input/awss3/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
S3 fields from s3 input.
release: ga
fields:
- name: bucket_name
- name: bucket.name
type: keyword
description: >
Name of the S3 bucket that this log retrieved from.
- name: object_key
- name: bucket.arn
type: keyword
description: >
ARN of the S3 bucket that this log retrieved from.
- name: object.key
type: keyword
description: >
Name of the S3 object that this log retrieved from.
- name: metadata
type: flattened
description:
AWS S3 object metadata values.
84 changes: 83 additions & 1 deletion x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -59,6 +60,7 @@ type s3Info struct {
key string
region string
arn string
meta map[string]interface{} // S3 object metadata.
readerConfig
}

Expand Down Expand Up @@ -403,6 +405,8 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
*resp.ContentType = info.readerConfig.ContentType
}

info.meta = s3Metadata(resp, info.IncludeS3Metadata...)

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.ExpandEventListFromField != "" {
decoder := json.NewDecoder(bodyReader)
Expand Down Expand Up @@ -454,6 +458,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
return fmt.Errorf("error reading message: %w", err)
}
event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx)
event.Fields.DeepUpdate(message.Fields)
offset += int64(message.Bytes)
if err = c.forwardEvent(event); err != nil {
return fmt.Errorf("forwardEvent failed: %w", err)
Expand Down Expand Up @@ -610,7 +615,9 @@ func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx
"bucket": common.MapStr{
"name": info.name,
"arn": info.arn},
"object.key": info.key,
"object": common.MapStr{
"key": info.key,
},
},
},
"cloud": common.MapStr{
Expand All @@ -622,6 +629,10 @@ func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx
}
event.SetID(objectID(objectHash, offset))

if len(info.meta) > 0 {
event.Fields.Put("aws.s3.metadata", info.meta)
}

return event
}

Expand All @@ -641,6 +652,77 @@ func s3ObjectHash(s3Info s3Info) string {
return prefix[:10]
}

// s3Metadata returns a map containing the selected S3 object metadata keys.
func s3Metadata(resp *s3.GetObjectResponse, keys ...string) common.MapStr {
if len(keys) == 0 {
return nil
}

// When you upload objects using the REST API, the optional user-defined
// metadata names must begin with "x-amz-meta-" to distinguish them from
// other HTTP headers.
const userMetaPrefix = "x-amz-meta-"

allMeta := map[string]interface{}{}

// Get headers using AWS SDK struct tags.
fields := reflect.TypeOf(resp.GetObjectOutput).Elem()
values := reflect.ValueOf(resp.GetObjectOutput).Elem()
for i := 0; i < fields.NumField(); i++ {
f := fields.Field(i)

if loc, _ := f.Tag.Lookup("location"); loc != "header" {
continue
}

name, found := f.Tag.Lookup("locationName")
if !found {
continue
}
name = strings.ToLower(name)

if name == userMetaPrefix {
continue
}

v := values.Field(i)
switch v.Kind() {
case reflect.Ptr:
if v.IsNil() {
continue
}
v = v.Elem()
default:
if v.IsZero() {
continue
}
}

allMeta[name] = v.Interface()
}

// Add in the user defined headers.
for k, v := range resp.Metadata {
k = strings.ToLower(k)
allMeta[userMetaPrefix+k] = v
}

// Select the matching headers from the config.
metadata := common.MapStr{}
for _, key := range keys {
key = strings.ToLower(key)

v, found := allMeta[key]
if !found {
continue
}

metadata[key] = v
}

return metadata
}

func (c *s3Context) setError(err error) {
// only care about the last error for now
// TODO: add "Typed" error to error for context
Expand Down
16 changes: 16 additions & 0 deletions x-pack/filebeat/input/awss3/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io/ioutil"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -489,3 +490,18 @@ func TestTrimLogDelimiter(t *testing.T) {
})
}
}

func TestS3Metadata(t *testing.T) {
resp := &s3.GetObjectResponse{
GetObjectOutput: &s3.GetObjectOutput{
ContentEncoding: awssdk.String("gzip"),
Metadata: map[string]string{
"Owner": "foo",
},
LastModified: awssdk.Time(time.Now()),
},
}

meta := s3Metadata(resp, "Content-Encoding", "x-amz-meta-owner", "last-modified")
assert.Len(t, meta, 3)
}
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type readerConfig struct {
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
ContentType string `config:"content_type"`
IncludeS3Metadata []string `config:"include_s3_metadata"`
}

func (f *readerConfig) Validate() error {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.