Skip to content

Commit

Permalink
[Filebeat] Add support for Cloudtrail digest files (elastic#21086) (e…
Browse files Browse the repository at this point in the history
…lastic#21089)

* Add support for Cloudtrail digest files

- allow file matching with file_selectors in s3 input
- update cloudtrail pipeline
- update cloudtrail config to use file_selectors
- add cloudtrail digest fields
- add cloudtrail insight fields

Closes elastic#20943

(cherry picked from commit 6e3cb57)
  • Loading branch information
leehinman authored Sep 15, 2020
1 parent 6aeff15 commit 0da7f36
Show file tree
Hide file tree
Showing 19 changed files with 614 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834]
- Fixed typo in log message. {pull}17897[17897]
- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048]
- Fix error when processing AWS Cloudtrail Digest logs. {pull}21086[21086] {issue}20943[20943]

*Heartbeat*

Expand Down
114 changes: 114 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,120 @@ type: flattened

--

[float]
=== digest

Fields from Cloudtrail Digest Logs


*`aws.cloudtrail.digest.log_files`*::
+
--
A list of Logfiles contained in the digest.

type: nested

--

*`aws.cloudtrail.digest.start_time`*::
+
--
The starting UTC time range that the digest file covers, taking as a reference the time in which log files have been delivered by CloudTrail.

type: date

--

*`aws.cloudtrail.digest.end_time`*::
+
--
The ending UTC time range that the digest file covers, taking as a reference the time in which log files have been delivered by CloudTrail.

type: date

--

*`aws.cloudtrail.digest.s3_bucket`*::
+
--
The name of the Amazon S3 bucket to which the current digest file has been delivered.

type: keyword

--

*`aws.cloudtrail.digest.s3_object`*::
+
--
The Amazon S3 object key (that is, the Amazon S3 bucket location) of the current digest file.

type: keyword

--

*`aws.cloudtrail.digest.newest_event_time`*::
+
--
The UTC time of the most recent event among all of the events in the log files in the digest.

type: date

--

*`aws.cloudtrail.digest.oldest_event_time`*::
+
--
The UTC time of the oldest event among all of the events in the log files in the digest.

type: date

--

*`aws.cloudtrail.digest.previous_s3_bucket`*::
+
--
The Amazon S3 bucket to which the previous digest file was delivered.

type: keyword

--

*`aws.cloudtrail.digest.previous_hash_algorithm`*::
+
--
The name of the hash algorithm that was used to hash the previous digest file.

type: keyword

--

*`aws.cloudtrail.digest.public_key_fingerprint`*::
+
--
The hexadecimal encoded fingerprint of the public key that matches the private key used to sign this digest file.

type: keyword

--

*`aws.cloudtrail.digest.signature_algorithm`*::
+
--
The algorithm used to sign the digest file.

type: keyword

--

*`aws.cloudtrail.insight_details`*::
+
--
Shows information about the underlying triggers of an Insights event, such as event source, user agent, statistics, API name, and whether the event is the start or end of the Insights event.

type: flattened

--

[float]
=== cloudwatch

Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/system/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def clean_keys(obj):
host_keys.append("host.name")

# The create timestamps area always new
time_keys = ["event.created"]
time_keys = ["event.created", "event.ingested"]
# source path and agent.version can be different for each run
other_keys = ["log.file.path", "agent.version"]
# ECS versions change for any ECS release, large or small
Expand Down
25 changes: 25 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,31 @@ type will not be checked.
If a file has "application/json" content-type, `expand_event_list_from_field`
becomes required to read the json file.

[float]
==== `file_selectors`

If the SQS queue will have events that correspond to files that
{beatname_uc} shouldn't process `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are
made up of `regex` and `expand_event_list_from_field` options. The
`regex` should match the S3 object key in the SQS message, and the
optional `expand_event_list_from_field` is the same as the global
setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones
specified in the `file_selectors`. Regex syntax is the same as the Go
language. Files that don't match one of the regexes won't be
processed.

["source", "yml"]
----
file_selectors:
- regex: '^AWSLogs/\d+/CloudTrail/'
expand_event_list_from_field: 'Records'
- regex: '^AWSLogs/\d+/CloudTrail-Digest'
```
----


[float]
==== `api_timeout`

Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# Process CloudTrail logs
# default is true, set to false to skip Cloudtrail logs
# var.process_cloudtrail_logs: false

# Process CloudTrail Digest logs
# default true, set to false to skip CloudTrail Digest logs
# var.process_digest_logs: false

# Process CloudTrail Insight logs
# default true, set to false to skip CloudTrail Insight logs
# var.process_insight_logs: false

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down
16 changes: 16 additions & 0 deletions x-pack/filebeat/input/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package s3

import (
"fmt"
"regexp"
"time"

"github.com/elastic/beats/v7/filebeat/harvester"
Expand All @@ -19,6 +20,14 @@ type config struct {
AwsConfig awscommon.ConfigAWS `config:",inline"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
APITimeout time.Duration `config:"api_timeout"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
}

// FileSelectorCfg defines type and configuration of FileSelectors
type FileSelectorCfg struct {
RegexString string `config:"regex"`
Regex *regexp.Regexp `config:",ignore"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

func defaultConfig() config {
Expand All @@ -40,5 +49,12 @@ func (c *config) Validate() error {
return fmt.Errorf("api timeout %v needs to be larger than"+
" 0s and smaller than half of the visibility timeout", c.APITimeout)
}
for i := range c.FileSelectors {
r, err := regexp.Compile(c.FileSelectors[i].RegexString)
if err != nil {
return err
}
c.FileSelectors[i].Regex = r
}
return nil
}
70 changes: 45 additions & 25 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ type s3Input struct {
}

type s3Info struct {
name string
key string
region string
arn string
name string
key string
region string
arn string
expandEventListFromField string
}

type bucket struct {
Expand Down Expand Up @@ -252,7 +253,7 @@ func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityT
func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, wg *sync.WaitGroup, errC chan error) {
defer wg.Done()

s3Infos, err := handleSQSMessage(message)
s3Infos, err := p.handleSQSMessage(message)
if err != nil {
p.logger.Error(errors.Wrap(err, "handleSQSMessage failed"))
return
Expand Down Expand Up @@ -352,7 +353,7 @@ func getRegionFromQueueURL(queueURL string) (string, error) {
}

// handle message
func handleSQSMessage(m sqs.Message) ([]s3Info, error) {
func (p *s3Input) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
msg := sqsMessage{}
err := json.Unmarshal([]byte(*m.Body), &msg)
if err != nil {
Expand All @@ -361,21 +362,40 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) {

var s3Infos []s3Info
for _, record := range msg.Records {
if record.EventSource == "aws:s3" && strings.HasPrefix(record.EventName, "ObjectCreated:") {
// Unescape substrings from s3 log name. For example, convert "%3D" back to "="
filename, err := url.QueryUnescape(record.S3.object.Key)
if err != nil {
return nil, errors.Wrapf(err, "url.QueryUnescape failed for '%s'", record.S3.object.Key)
}
if record.EventSource != "aws:s3" || !strings.HasPrefix(record.EventName, "ObjectCreated:") {
return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications")
}
// Unescape substrings from s3 log name. For example, convert "%3D" back to "="
filename, err := url.QueryUnescape(record.S3.object.Key)
if err != nil {
return nil, errors.Wrapf(err, "url.QueryUnescape failed for '%s'", record.S3.object.Key)
}

if len(p.config.FileSelectors) == 0 {
s3Infos = append(s3Infos, s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: p.config.ExpandEventListFromField,
})
} else {
return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications")
continue
}

for _, fs := range p.config.FileSelectors {
if fs.Regex == nil {
continue
}
if fs.Regex.MatchString(filename) {
s3Infos = append(s3Infos, s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: fs.ExpandEventListFromField,
})
break
}
}
}
return s3Infos, nil
Expand Down Expand Up @@ -456,7 +476,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || p.config.ExpandEventListFromField != "" {
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" {
decoder := json.NewDecoder(reader)
err := p.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
Expand Down Expand Up @@ -537,10 +557,10 @@ func (p *s3Input) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3
func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
switch f := jsonFields.(type) {
case map[string][]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if s3Info.expandEventListFromField != "" {
textValues, ok := f[s3Info.expandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
err := errors.Errorf("key '%s' not found", s3Info.expandEventListFromField)
p.logger.Error(err)
return offset, err
}
Expand All @@ -555,10 +575,10 @@ func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash
return offset, nil
}
case map[string]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if s3Info.expandEventListFromField != "" {
textValues, ok := f[s3Info.expandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
err := errors.Errorf("key '%s' not found", s3Info.expandEventListFromField)
p.logger.Error(err)
return offset, err
}
Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/s3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ func TestHandleMessage(t *testing.T) {
},
}

p := &s3Input{context: &channelContext{}}
for _, c := range casesPositive {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
s3Info, err := p.handleSQSMessage(c.message)
assert.NoError(t, err)
assert.Equal(t, len(c.expectedS3Infos), len(s3Info))
if len(s3Info) > 0 {
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestHandleMessage(t *testing.T) {

for _, c := range casesNegative {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
s3Info, err := p.handleSQSMessage(c.message)
assert.Error(t, err)
assert.Nil(t, s3Info)
})
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/module/aws/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# Process CloudTrail logs
# default is true, set to false to skip Cloudtrail logs
# var.process_cloudtrail_logs: false

# Process CloudTrail Digest logs
# default true, set to false to skip CloudTrail Digest logs
# var.process_digest_logs: false

# Process CloudTrail Insight logs
# default true, set to false to skip CloudTrail Insight logs
# var.process_insight_logs: false

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down
Loading

0 comments on commit 0da7f36

Please sign in to comment.