From 39e7a145560bca99d29c26012f0802d1441feee7 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 3 Sep 2021 11:56:58 +0200 Subject: [PATCH] Override region according to bucket location (#27695) (#27732) * Override region according to bucket location * added integration test for getRegionForBucketARN * update docs on iam permissions (cherry picked from commit bf688235e1a7eaf167574209bf97d10b4f9fbf86) Co-authored-by: Andrea Spacca --- .../docs/inputs/input-aws-s3.asciidoc | 3 +- x-pack/filebeat/input/awss3/input.go | 33 ++++++++++++++++--- .../input/awss3/input_integration_test.go | 22 +++++++++++-- x-pack/filebeat/input/awss3/states.go | 2 +- 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index dbeb34d1ef2..10be92ca410 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -304,12 +304,13 @@ sqs:ChangeMessageVisibility sqs:DeleteMessage ---- -Reduced specific AWS permissions are required for IAM user to access S3 +Reduced specific S3 AWS permissions are required for IAM user to access S3 when using the polling list of S3 bucket objects: ---- s3:GetObject s3:ListBucket +s3:GetBucketLocation ---- [float] diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 6662984e060..545c24db322 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -140,9 +140,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { if in.config.BucketARN != "" { // Create S3 receiver and S3 notification processor. - poller, err := in.createS3Lister(inputContext, client, persistentStore, states) + poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states) if err != nil { - return fmt.Errorf("failed to initialize sqs receiver: %w", err) + return fmt.Errorf("failed to initialize s3 poller: %w", err) } defer poller.metrics.Close() @@ -193,14 +193,23 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe return sqsReader, nil } -func (in *s3Input) createS3Lister(ctx v2.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) { +func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) { s3ServiceName := "s3" if in.config.FIPSEnabled { s3ServiceName = "s3-fips" } + s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)) + regionName, err := getRegionForBucketARN(cancelCtx, s3Client, in.config.BucketARN) + if err != nil { + return nil, fmt.Errorf("failed to get AWS region for bucket_arn: %w", err) + } + + in.awsConfig.Region = regionName + s3Client = s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)) + s3API := &awsS3API{ - client: s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)), + client: s3Client, } log := ctx.Logger.With("bucket_arn", in.config.BucketARN) @@ -246,3 +255,19 @@ func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) { } return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}") } + +func getRegionForBucketARN(ctx context.Context, s3Client *s3.Client, bucketARN string) (string, error) { + bucketMetadata := strings.Split(bucketARN, ":") + bucketName := bucketMetadata[len(bucketMetadata)-1] + + req := s3Client.GetBucketLocationRequest(&s3.GetBucketLocationInput{ + Bucket: awssdk.String(bucketName), + }) + + resp, err := req.Send(ctx) + if err != nil { + return "", err + } + + return string(resp.LocationConstraint), nil +} diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 4b8a3115db6..7b157dcfb6d 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -19,16 +19,16 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/filebeat/beater" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/external" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/s3manager" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/stretchr/testify/assert" "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -36,6 +36,7 @@ import ( pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) const ( @@ -366,3 +367,20 @@ func drainSQS(t *testing.T, tfConfig terraformOutputData) { } t.Logf("Drained %d SQS messages.", deletedCount) } + +func TestGetRegionForBucketARN(t *testing.T) { + logp.TestingSetup() + + // Terraform is used to setup S3 and must be executed manually. + tfConfig := getTerraformOutputs(t) + + awsConfig, err := external.LoadDefaultAWSConfig() + if err != nil { + t.Fatal(err) + } + + s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig)) + + regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName) + assert.Equal(t, tfConfig.AWSRegion, regionName) +} diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index 6674ee104c1..70db34797db 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -144,7 +144,7 @@ func (s *states) Update(newState state, listingID string) { // No existing state found, add new one s.idx[id] = len(s.states) s.states = append(s.states, newState) - s.log.Debug("input", "New state added for %s", newState.ID) + s.log.Debug("New state added for ", newState.ID) } if listingID == "" || (!newState.Stored && !newState.Error) {