From 111eb250b6350a17b73c6f978ef19b6b30d1ed7e Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Tue, 5 Oct 2021 15:15:04 +0200 Subject: [PATCH 1/4] [Filebeat] - S3 Input - Add support for only iterating/accessing only specific folders or datapaths --- x-pack/filebeat/filebeat.reference.yml | 18 +++++++ x-pack/filebeat/input/awss3/config.go | 2 + x-pack/filebeat/input/awss3/config_test.go | 16 +++--- x-pack/filebeat/input/awss3/input.go | 4 +- .../input/awss3/input_benchmark_test.go | 4 +- .../input/awss3/input_integration_test.go | 52 +++++++++++++++++-- x-pack/filebeat/input/awss3/interfaces.go | 5 +- .../input/awss3/mock_interfaces_test.go | 23 ++++---- .../input/awss3/mock_publisher_test.go | 7 +-- x-pack/filebeat/input/awss3/s3.go | 5 +- .../filebeat/input/awss3/s3_objects_test.go | 4 +- x-pack/filebeat/input/awss3/s3_test.go | 16 +++--- x-pack/filebeat/module/aws/_meta/config.yml | 18 +++++++ .../filebeat/module/aws/_meta/docs.asciidoc | 10 ++++ .../module/aws/cloudtrail/config/aws-s3.yml | 5 ++ .../module/aws/cloudtrail/manifest.yml | 1 + .../module/aws/cloudwatch/config/aws-s3.yml | 4 ++ .../module/aws/cloudwatch/manifest.yml | 1 + .../filebeat/module/aws/ec2/config/aws-s3.yml | 4 ++ x-pack/filebeat/module/aws/ec2/manifest.yml | 1 + .../filebeat/module/aws/elb/config/aws-s3.yml | 4 ++ x-pack/filebeat/module/aws/elb/manifest.yml | 1 + .../module/aws/s3access/config/aws-s3.yml | 4 ++ .../filebeat/module/aws/s3access/manifest.yml | 1 + .../module/aws/vpcflow/config/input.yml | 4 ++ .../filebeat/module/aws/vpcflow/manifest.yml | 1 + x-pack/filebeat/modules.d/aws.yml.disabled | 18 +++++++ 27 files changed, 187 insertions(+), 46 deletions(-) diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 2fc7721ea27..f75f5a58883 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -105,6 +105,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -166,6 +169,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -215,6 +221,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -264,6 +273,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -313,6 +325,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -362,6 +377,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 404997ddc60..4105fbd9093 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -29,6 +29,7 @@ type config struct { QueueURL string `config:"queue_url"` BucketARN string `config:"bucket_arn"` BucketListInterval time.Duration `config:"bucket_list_interval"` + BucketListPrefix string `config:"bucket_list_prefix"` NumberOfWorkers int `config:"number_of_workers"` AWSConfig awscommon.ConfigAWS `config:",inline"` FileSelectors []fileSelectorConfig `config:"file_selectors"` @@ -40,6 +41,7 @@ func defaultConfig() config { APITimeout: 120 * time.Second, VisibilityTimeout: 300 * time.Second, BucketListInterval: 120 * time.Second, + BucketListPrefix: "", SQSWaitTime: 20 * time.Second, SQSMaxReceiveCount: 5, FIPSEnabled: false, diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index cd75d4df19c..f8573d60525 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -28,13 +28,15 @@ func TestConfig(t *testing.T) { parserConf := parser.Config{} require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom(""))) return config{ - QueueURL: quequeURL, - BucketARN: s3Bucket, - APITimeout: 120 * time.Second, - VisibilityTimeout: 300 * time.Second, - SQSMaxReceiveCount: 5, - SQSWaitTime: 20 * time.Second, - BucketListInterval: 120 * time.Second, + QueueURL: quequeURL, + BucketARN: s3Bucket, + APITimeout: 120 * time.Second, + VisibilityTimeout: 300 * time.Second, + SQSMaxReceiveCount: 5, + SQSWaitTime: 20 * time.Second, + BucketListInterval: 120 * time.Second, + BucketListPrefix: "", + FIPSEnabled: false, MaxNumberOfMessages: 5, ReaderConfig: readerConfig{ diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 545c24db322..4cd5a2c82b7 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -215,6 +215,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli log := ctx.Logger.With("bucket_arn", in.config.BucketARN) log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers) log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval) + log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix) log.Infof("AWS region is set to %v.", in.awsConfig.Region) log.Debugf("AWS S3 service name is %v.", s3ServiceName) @@ -233,6 +234,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli states, persistentStore, in.config.BucketARN, + in.config.BucketListPrefix, in.awsConfig.Region, in.config.NumberOfWorkers, in.config.BucketListInterval) @@ -269,5 +271,5 @@ func getRegionForBucketARN(ctx context.Context, s3Client *s3.Client, bucketARN s return "", err } - return string(resp.LocationConstraint), nil + return string(s3.NormalizeBucketLocation(resp.LocationConstraint)), nil } diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 00540479d5c..9c718c83ac8 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -134,7 +134,7 @@ func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetO return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil } -func (c constantS3) ListObjectsPaginator(bucket string) s3Pager { +func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager { return c.pagerConstant } @@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult } s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors) - s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "region", numberOfWorkers, time.Second) + s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "key-", "region", numberOfWorkers, time.Second) ctx, cancel := context.WithCancel(context.Background()) b.Cleanup(cancel) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 7b157dcfb6d..cd62d7f7d36 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -4,9 +4,6 @@ // See _meta/terraform/README.md for integration test usage instructions. -// +build integration -// +build aws - package awss3 import ( @@ -384,3 +381,52 @@ func TestGetRegionForBucketARN(t *testing.T) { regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName) assert.Equal(t, tfConfig.AWSRegion, regionName) } + +func TestPaginatorListPrefix(t *testing.T) { + logp.TestingSetup() + + // Terraform is used to setup S3 and must be executed manually. + tfConfig := getTerraformOutputs(t) + + uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName, + "testdata/events-array.json", + "testdata/invalid.json", + "testdata/log.json", + "testdata/log.ndjson", + "testdata/multiline.json", + "testdata/multiline.json.gz", + "testdata/multiline.txt", + "testdata/log.txt", // Skipped (no match). + ) + + awsConfig, err := external.LoadDefaultAWSConfig() + awsConfig.Region = tfConfig.AWSRegion + if err != nil { + t.Fatal(err) + } + + s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig)) + + s3API := &awsS3API{ + client: s3Client, + } + + var objects []string + paginator := s3API.ListObjectsPaginator(tfConfig.BucketName, "log") + for paginator.Next(context.Background()) { + page := paginator.CurrentPage() + for _, object := range page.Contents { + objects = append(objects, *object.Key) + } + } + + assert.NoError(t, paginator.Err()) + + expected := []string{ + "log.json", + "log.ndjson", + "log.txt", + } + + assert.Equal(t, expected, objects) +} diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 2e406717348..c777072c6c9 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -66,7 +66,7 @@ type s3Getter interface { } type s3Lister interface { - ListObjectsPaginator(bucket string) s3Pager + ListObjectsPaginator(bucket, prefix string) s3Pager } type s3Pager interface { @@ -204,9 +204,10 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb return resp, nil } -func (a *awsS3API) ListObjectsPaginator(bucket string) s3Pager { +func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager { req := a.client.ListObjectsRequest(&s3.ListObjectsInput{ Bucket: awssdk.String(bucket), + Prefix: awssdk.String(prefix), }) pager := s3.NewListObjectsPaginator(req) diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index 1929fa7c9ec..007abe820e6 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -1,7 +1,3 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - // Code generated by MockGen. DO NOT EDIT. // Source: interfaces.go @@ -15,9 +11,8 @@ import ( s3 "github.com/aws/aws-sdk-go-v2/service/s3" sqs "github.com/aws/aws-sdk-go-v2/service/sqs" - gomock "github.com/golang/mock/gomock" - logp "github.com/elastic/beats/v7/libbeat/logp" + gomock "github.com/golang/mock/gomock" ) // MockSQSAPI is a mock of sqsAPI interface. @@ -274,17 +269,17 @@ func (mr *MockS3APIMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock } // ListObjectsPaginator mocks base method. -func (m *MockS3API) ListObjectsPaginator(bucket string) s3Pager { +func (m *MockS3API) ListObjectsPaginator(bucket, prefix string) s3Pager { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListObjectsPaginator", bucket) + ret := m.ctrl.Call(m, "ListObjectsPaginator", bucket, prefix) ret0, _ := ret[0].(s3Pager) return ret0 } // ListObjectsPaginator indicates an expected call of ListObjectsPaginator. -func (mr *MockS3APIMockRecorder) ListObjectsPaginator(bucket interface{}) *gomock.Call { +func (mr *MockS3APIMockRecorder) ListObjectsPaginator(bucket, prefix interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginator", reflect.TypeOf((*MockS3API)(nil).ListObjectsPaginator), bucket) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginator", reflect.TypeOf((*MockS3API)(nil).ListObjectsPaginator), bucket, prefix) } // Mocks3Getter is a mock of s3Getter interface. @@ -349,17 +344,17 @@ func (m *Mocks3Lister) EXPECT() *Mocks3ListerMockRecorder { } // ListObjectsPaginator mocks base method. -func (m *Mocks3Lister) ListObjectsPaginator(bucket string) s3Pager { +func (m *Mocks3Lister) ListObjectsPaginator(bucket, prefix string) s3Pager { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListObjectsPaginator", bucket) + ret := m.ctrl.Call(m, "ListObjectsPaginator", bucket, prefix) ret0, _ := ret[0].(s3Pager) return ret0 } // ListObjectsPaginator indicates an expected call of ListObjectsPaginator. -func (mr *Mocks3ListerMockRecorder) ListObjectsPaginator(bucket interface{}) *gomock.Call { +func (mr *Mocks3ListerMockRecorder) ListObjectsPaginator(bucket, prefix interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginator", reflect.TypeOf((*Mocks3Lister)(nil).ListObjectsPaginator), bucket) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginator", reflect.TypeOf((*Mocks3Lister)(nil).ListObjectsPaginator), bucket, prefix) } // MockS3Pager is a mock of s3Pager interface. diff --git a/x-pack/filebeat/input/awss3/mock_publisher_test.go b/x-pack/filebeat/input/awss3/mock_publisher_test.go index 7fa935496aa..40c46062a38 100644 --- a/x-pack/filebeat/input/awss3/mock_publisher_test.go +++ b/x-pack/filebeat/input/awss3/mock_publisher_test.go @@ -1,7 +1,3 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - // Code generated by MockGen. DO NOT EDIT. // Source: github.com/elastic/beats/v7/libbeat/beat (interfaces: Client) @@ -11,9 +7,8 @@ package awss3 import ( reflect "reflect" - gomock "github.com/golang/mock/gomock" - beat "github.com/elastic/beats/v7/libbeat/beat" + gomock "github.com/golang/mock/gomock" ) // MockBeatClient is a mock of Client interface. diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 313a71211e7..d69c9eccb44 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -41,6 +41,7 @@ type s3ObjectPayload struct { type s3Poller struct { numberOfWorkers int bucket string + listPrefix string region string bucketPollInterval time.Duration workerSem *sem @@ -61,6 +62,7 @@ func newS3Poller(log *logp.Logger, states *states, store *statestore.Store, bucket string, + listPrefix string, awsRegion string, numberOfWorkers int, bucketPollInterval time.Duration) *s3Poller { @@ -70,6 +72,7 @@ func newS3Poller(log *logp.Logger, return &s3Poller{ numberOfWorkers: numberOfWorkers, bucket: bucket, + listPrefix: listPrefix, region: awsRegion, bucketPollInterval: bucketPollInterval, workerSem: newSem(numberOfWorkers), @@ -142,7 +145,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- bucketMetadata := strings.Split(p.bucket, ":") bucketName := bucketMetadata[len(bucketMetadata)-1] - paginator := p.s3.ListObjectsPaginator(bucketName) + paginator := p.s3.ListObjectsPaginator(bucketName, p.listPrefix) for paginator.Next(ctx) { listingID, err := uuid.NewV4() if err != nil { diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 6cf1ea1fa5a..4ff6e4754f0 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -212,11 +212,11 @@ func TestNewMockS3Pager(t *testing.T) { defer ctrl.Finish() mockS3Pager := newMockS3Pager(ctrl, 1, fakeObjects) mockS3API := NewMockS3API(ctrl) - mockS3API.EXPECT().ListObjectsPaginator(gomock.Any()).Return(mockS3Pager) + mockS3API.EXPECT().ListObjectsPaginator(gomock.Any(), "").Return(mockS3Pager) // Test the mock. var keys []string - pager := mockS3API.ListObjectsPaginator("nombre") + pager := mockS3API.ListObjectsPaginator("nombre", "") for pager.Next(ctx) { for _, s3Obj := range pager.CurrentPage().Contents { keys = append(keys, *s3Obj.Key) diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index dc87356ba65..250730ab055 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -48,9 +48,9 @@ func TestS3Poller(t *testing.T) { gomock.InOrder( mockAPI.EXPECT(). - ListObjectsPaginator(gomock.Eq(bucket)). + ListObjectsPaginator(gomock.Eq(bucket), gomock.Eq("key")). Times(1). - DoAndReturn(func(_ string) s3Pager { + DoAndReturn(func(_, _ string) s3Pager { return mockPager }), ) @@ -133,7 +133,7 @@ func TestS3Poller(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) - receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "region", numberOfWorkers, pollInterval) + receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.available) }) @@ -158,16 +158,16 @@ func TestS3Poller(t *testing.T) { gomock.InOrder( // Initial ListObjectPaginator gets an error. mockAPI.EXPECT(). - ListObjectsPaginator(gomock.Eq(bucket)). + ListObjectsPaginator(gomock.Eq(bucket), gomock.Eq("key")). Times(1). - DoAndReturn(func(_ string) s3Pager { + DoAndReturn(func(_, _ string) s3Pager { return mockPagerFirst }), // After waiting for pollInterval, it retries. mockAPI.EXPECT(). - ListObjectsPaginator(gomock.Eq(bucket)). + ListObjectsPaginator(gomock.Eq(bucket), gomock.Eq("key")). Times(1). - DoAndReturn(func(_ string) s3Pager { + DoAndReturn(func(_, _ string) s3Pager { return mockPagerSecond }), ) @@ -263,7 +263,7 @@ func TestS3Poller(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) - receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "region", numberOfWorkers, pollInterval) + receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.available) }) diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index 393a197bc4e..1b139779b84 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -8,6 +8,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -69,6 +72,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -118,6 +124,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -167,6 +176,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -216,6 +228,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -265,6 +280,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s diff --git a/x-pack/filebeat/module/aws/_meta/docs.asciidoc b/x-pack/filebeat/module/aws/_meta/docs.asciidoc index a55dbc4583c..4651e902017 100644 --- a/x-pack/filebeat/module/aws/_meta/docs.asciidoc +++ b/x-pack/filebeat/module/aws/_meta/docs.asciidoc @@ -45,6 +45,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -62,6 +63,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -79,6 +81,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -96,6 +99,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -113,6 +117,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -130,6 +135,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -173,6 +179,10 @@ Use to vertically scale the input. Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. +*`var.bucket_bucket_list_prefix`*:: + +Prefix to apply for the list request to the S3 bucket. Default empty. + *`var.endpoint`*:: Custom endpoint used to access AWS APIs. diff --git a/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml b/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml index 6134344678e..c95abb1cdc2 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml @@ -13,6 +13,11 @@ number_of_workers: {{ .number_of_workers }} {{ if .bucket_list_interval }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} + +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + file_selectors: {{ if .process_cloudtrail_logs }} - regex: '/CloudTrail/' diff --git a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml index 6d2c9cdebe0..c0715d7647c 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml b/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml index c98582c21ea..5b1fb24f561 100644 --- a/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml @@ -14,6 +14,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/cloudwatch/manifest.yml b/x-pack/filebeat/module/aws/cloudwatch/manifest.yml index 7634f73d8d2..0223142a6a9 100644 --- a/x-pack/filebeat/module/aws/cloudwatch/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudwatch/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml b/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml index c98582c21ea..5b1fb24f561 100644 --- a/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml @@ -14,6 +14,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/ec2/manifest.yml b/x-pack/filebeat/module/aws/ec2/manifest.yml index 7634f73d8d2..0223142a6a9 100644 --- a/x-pack/filebeat/module/aws/ec2/manifest.yml +++ b/x-pack/filebeat/module/aws/ec2/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/elb/config/aws-s3.yml b/x-pack/filebeat/module/aws/elb/config/aws-s3.yml index c98582c21ea..5b1fb24f561 100644 --- a/x-pack/filebeat/module/aws/elb/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/elb/config/aws-s3.yml @@ -14,6 +14,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/elb/manifest.yml b/x-pack/filebeat/module/aws/elb/manifest.yml index 128dc59791e..da22ed1b1cc 100644 --- a/x-pack/filebeat/module/aws/elb/manifest.yml +++ b/x-pack/filebeat/module/aws/elb/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml b/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml index c98582c21ea..5b1fb24f561 100644 --- a/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml @@ -14,6 +14,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/s3access/manifest.yml b/x-pack/filebeat/module/aws/s3access/manifest.yml index 7634f73d8d2..0223142a6a9 100644 --- a/x-pack/filebeat/module/aws/s3access/manifest.yml +++ b/x-pack/filebeat/module/aws/s3access/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index f5987c033d5..51b649b1e2e 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -16,6 +16,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/vpcflow/manifest.yml b/x-pack/filebeat/module/aws/vpcflow/manifest.yml index 87850096ed5..8871cf1cffb 100644 --- a/x-pack/filebeat/module/aws/vpcflow/manifest.yml +++ b/x-pack/filebeat/module/aws/vpcflow/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index 57e370541c8..3d34116d225 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -11,6 +11,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -72,6 +75,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -121,6 +127,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -170,6 +179,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -219,6 +231,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -268,6 +283,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s From 01ff21ffa7d929bfcb075f09910a73f469c2b997 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Tue, 5 Oct 2021 15:19:20 +0200 Subject: [PATCH 2/4] changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 812f5d15e21..5e09b13d0fe 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -305,6 +305,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update indentation for azure filebeat configuration. {pull}26604[26604] - Update Sophos xg module pipeline to deal with missing `date` and `time` fields. {pull}27834[27834] - sophos/xg fileset: Add missing pipeline for System Health logs. {pull}27827[27827] {issue}27826[27826] +- Add support for passing a prefix on S3 bucket list mode for AWS-S3 input {pull}28252[28252] {issue}27965[27965] *Heartbeat* From 36db120a59dedeb20f58593bac65a122a7800e5d Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Tue, 5 Oct 2021 16:02:49 +0200 Subject: [PATCH 3/4] make update --- filebeat/docs/modules/aws.asciidoc | 10 ++++++++++ x-pack/filebeat/input/awss3/input_integration_test.go | 3 +++ x-pack/filebeat/input/awss3/mock_interfaces_test.go | 7 ++++++- x-pack/filebeat/input/awss3/mock_publisher_test.go | 7 ++++++- 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/filebeat/docs/modules/aws.asciidoc b/filebeat/docs/modules/aws.asciidoc index 0035f225e38..7598f416dda 100644 --- a/filebeat/docs/modules/aws.asciidoc +++ b/filebeat/docs/modules/aws.asciidoc @@ -50,6 +50,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -67,6 +68,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -84,6 +86,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -101,6 +104,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -118,6 +122,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -135,6 +140,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -178,6 +184,10 @@ Use to vertically scale the input. Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. +*`var.bucket_bucket_list_prefix`*:: + +Prefix to apply for the list request to the S3 bucket. Default empty. + *`var.endpoint`*:: Custom endpoint used to access AWS APIs. diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index cd62d7f7d36..1a2fadef022 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -4,6 +4,9 @@ // See _meta/terraform/README.md for integration test usage instructions. +// +build integration +// +build aws + package awss3 import ( diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index 007abe820e6..85c11e0fe80 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + // Code generated by MockGen. DO NOT EDIT. // Source: interfaces.go @@ -11,8 +15,9 @@ import ( s3 "github.com/aws/aws-sdk-go-v2/service/s3" sqs "github.com/aws/aws-sdk-go-v2/service/sqs" - logp "github.com/elastic/beats/v7/libbeat/logp" gomock "github.com/golang/mock/gomock" + + logp "github.com/elastic/beats/v7/libbeat/logp" ) // MockSQSAPI is a mock of sqsAPI interface. diff --git a/x-pack/filebeat/input/awss3/mock_publisher_test.go b/x-pack/filebeat/input/awss3/mock_publisher_test.go index 40c46062a38..7fa935496aa 100644 --- a/x-pack/filebeat/input/awss3/mock_publisher_test.go +++ b/x-pack/filebeat/input/awss3/mock_publisher_test.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + // Code generated by MockGen. DO NOT EDIT. // Source: github.com/elastic/beats/v7/libbeat/beat (interfaces: Client) @@ -7,8 +11,9 @@ package awss3 import ( reflect "reflect" - beat "github.com/elastic/beats/v7/libbeat/beat" gomock "github.com/golang/mock/gomock" + + beat "github.com/elastic/beats/v7/libbeat/beat" ) // MockBeatClient is a mock of Client interface. From b2fc0162a6bc3d545dc8c878ff15662aac25ee67 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Tue, 5 Oct 2021 18:43:07 +0200 Subject: [PATCH 4/4] cr fixes --- filebeat/docs/modules/aws.asciidoc | 2 +- x-pack/filebeat/module/aws/_meta/docs.asciidoc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/docs/modules/aws.asciidoc b/filebeat/docs/modules/aws.asciidoc index 7598f416dda..097e22a741c 100644 --- a/filebeat/docs/modules/aws.asciidoc +++ b/filebeat/docs/modules/aws.asciidoc @@ -184,7 +184,7 @@ Use to vertically scale the input. Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. -*`var.bucket_bucket_list_prefix`*:: +*`var.bucket_list_prefix`*:: Prefix to apply for the list request to the S3 bucket. Default empty. diff --git a/x-pack/filebeat/module/aws/_meta/docs.asciidoc b/x-pack/filebeat/module/aws/_meta/docs.asciidoc index 4651e902017..3fee8460161 100644 --- a/x-pack/filebeat/module/aws/_meta/docs.asciidoc +++ b/x-pack/filebeat/module/aws/_meta/docs.asciidoc @@ -179,7 +179,7 @@ Use to vertically scale the input. Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. -*`var.bucket_bucket_list_prefix`*:: +*`var.bucket_list_prefix`*:: Prefix to apply for the list request to the S3 bucket. Default empty.