Skip to content

Commit

Permalink
Override region according to bucket location (#27695) (#27732)
Browse files Browse the repository at this point in the history
* Override region according to bucket location

* added integration test for getRegionForBucketARN

* update docs on iam permissions

(cherry picked from commit bf68823)

Co-authored-by: Andrea Spacca <[email protected]>
  • Loading branch information
mergify[bot] and Andrea Spacca authored Sep 3, 2021
1 parent 119827c commit 39e7a14
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 8 deletions.
3 changes: 2 additions & 1 deletion x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,13 @@ sqs:ChangeMessageVisibility
sqs:DeleteMessage
----

Reduced specific AWS permissions are required for IAM user to access S3
Reduced specific S3 AWS permissions are required for IAM user to access S3
when using the polling list of S3 bucket objects:

----
s3:GetObject
s3:ListBucket
s3:GetBucketLocation
----

[float]
Expand Down
33 changes: 29 additions & 4 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {

if in.config.BucketARN != "" {
// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, client, persistentStore, states)
poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states)
if err != nil {
return fmt.Errorf("failed to initialize sqs receiver: %w", err)
return fmt.Errorf("failed to initialize s3 poller: %w", err)
}
defer poller.metrics.Close()

Expand Down Expand Up @@ -193,14 +193,23 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
return sqsReader, nil
}

func (in *s3Input) createS3Lister(ctx v2.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) {
func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) {
s3ServiceName := "s3"
if in.config.FIPSEnabled {
s3ServiceName = "s3-fips"
}

s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig))
regionName, err := getRegionForBucketARN(cancelCtx, s3Client, in.config.BucketARN)
if err != nil {
return nil, fmt.Errorf("failed to get AWS region for bucket_arn: %w", err)
}

in.awsConfig.Region = regionName
s3Client = s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig))

s3API := &awsS3API{
client: s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)),
client: s3Client,
}

log := ctx.Logger.With("bucket_arn", in.config.BucketARN)
Expand Down Expand Up @@ -246,3 +255,19 @@ func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) {
}
return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")
}

func getRegionForBucketARN(ctx context.Context, s3Client *s3.Client, bucketARN string) (string, error) {
bucketMetadata := strings.Split(bucketARN, ":")
bucketName := bucketMetadata[len(bucketMetadata)-1]

req := s3Client.GetBucketLocationRequest(&s3.GetBucketLocationInput{
Bucket: awssdk.String(bucketName),
})

resp, err := req.Send(ctx)
if err != nil {
return "", err
}

return string(resp.LocationConstraint), nil
}
22 changes: 20 additions & 2 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/filebeat/beater"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/external"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
)

const (
Expand Down Expand Up @@ -366,3 +367,20 @@ func drainSQS(t *testing.T, tfConfig terraformOutputData) {
}
t.Logf("Drained %d SQS messages.", deletedCount)
}

func TestGetRegionForBucketARN(t *testing.T) {
logp.TestingSetup()

// Terraform is used to setup S3 and must be executed manually.
tfConfig := getTerraformOutputs(t)

awsConfig, err := external.LoadDefaultAWSConfig()
if err != nil {
t.Fatal(err)
}

s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig))

regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName)
assert.Equal(t, tfConfig.AWSRegion, regionName)
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *states) Update(newState state, listingID string) {
// No existing state found, add new one
s.idx[id] = len(s.states)
s.states = append(s.states, newState)
s.log.Debug("input", "New state added for %s", newState.ID)
s.log.Debug("New state added for ", newState.ID)
}

if listingID == "" || (!newState.Stored && !newState.Error) {
Expand Down

0 comments on commit 39e7a14

Please sign in to comment.