diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fe00a7dc4bd..d8e8b320023 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -782,6 +782,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro - Add latency config option for aws-cloudwatch input. {pull}28509[28509] - Added proxy support to threatintel/malwarebazaar. {pull}28533[28533] - Add `text/csv` decoder to `httpjson` input {pull}28564[28564] +- Update `aws-s3` input to connect to non AWS S3 buckets {issue}28222[28222] {pull}28234[28234] *Heartbeat* diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 74cba05676b..c13c00d2552 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -90,6 +90,18 @@ # to arrive in the queue before returning. #sqs.wait_time: 20s + # Bucket ARN used for polling AWS S3 buckets + #bucket_arn: arn:aws:s3:::test-s3-bucket + + # Bucket Name used for polling non-AWS S3 buckets + #non_aws_bucket_name: test-s3-bucket + + # Configures the AWS S3 API to use path style instead of virtual host style (default) + #path_style: false + + # Overrides the `cloud.provider` field for non-AWS S3 buckets. See docs for auto recognized providers. + #provider: minio + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 5ed5886efde..1c3b23e9267 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -61,6 +61,28 @@ Listing of the S3 bucket will be polled according the time interval defined by expand_event_list_from_field: Records ---- + +The `aws-s3` input can also poll 3rd party S3 compatible services such as the self hosted Minio. +Using non-AWS S3 compatible buckets requires the use of `access_key_id` and `secret_access_key` for authentication. +To specify the S3 bucket name, use the `non_aws_bucket_name` config and the `endpoint` must be set to replace the default API endpoint. +`endpoint` should be a full URI in the form of `https(s)://`, that will be used as the API endpoint of the service, or a single domain. +If a domain is provided, the full endpoint URI will be constructed with the region name in the standard form of `https://s3..` supported by AWS and several 3rd party providers. +No `endpoint` is needed if using the native AWS S3 service hosted at `amazonaws.com`. +Please see <> for alternate AWS domains that require a different endpoint. + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: aws-s3 + non_aws_bucket_name: test-s3-bucket + number_of_workers: 5 + bucket_list_interval: 300s + access_key_id: xxxxxxx + secret_access_key: xxxxxxx + endpoint: https://s3.example.com:9000 + expand_event_list_from_field: Records +---- + The `aws-s3` input supports the following configuration options plus the <<{beatname_lc}-input-{type}-common-options>> described later. @@ -236,7 +258,7 @@ configuring multiline options. [float] ==== `queue_url` -URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn` is not set). +URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn` and `non_aws_bucket_name` are not set). [float] ==== `visibility_timeout` @@ -270,7 +292,12 @@ value is `20s`. [float] ==== `bucket_arn` -ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` is not set). +ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` and `non_aws_bucket_name` are not set). + +[float] +==== `non_aws_bucket_name` + +Name of the S3 bucket that will be polled for list operation. Required for 3rd party S3 compatible services. (Required when `queue_url` and `bucket_arn` are not set). [float] ==== `bucket_list_interval` @@ -288,6 +315,40 @@ Prefix to apply for the list request to the S3 bucket. Default empty. Number of workers that will process the S3 objects listed. (Required when `bucket_arn` is set). +[float] +==== `provider` + +Name of the 3rd party S3 bucket provider like backblaze or GCP. +The following endpoints/providers will be detected automatically: +|=== +|Domain |Provider +|amazonaws.com, amazonaws.com.cn, c2s.sgov.gov, c2s.ic.gov |aws +|backblazeb2.com |backblaze +|wasabisys.com |wasabi +|digitaloceanspaces.com |digitalocean +|dream.io |dreamhost +|scw.cloud |scaleway +|googleapis.com |gcp +|cloud.it |arubacloud +|linodeobjects.com |linode +|vultrobjects.com |vultr +|appdomain.cloud |ibm +|aliyuncs.com |alibaba +|oraclecloud.com |oracle +|exo.io |exoscale +|upcloudobjects.com |upcloud +|ilandcloud.com |iland +|zadarazios.com |zadara +|=== + + +[float] +==== `path_style` + +Enabling this option sets the bucket name as a path in the API call instead of a subdomain. When enabled +https://.s3...com becomes https://s3...com/. +This is only supported with 3rd party S3 providers. AWS does not support path style. + [float] ==== `aws credentials` diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 91f8bf0bd2e..c10eee26c5e 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3045,6 +3045,18 @@ filebeat.inputs: # to arrive in the queue before returning. #sqs.wait_time: 20s + # Bucket ARN used for polling AWS S3 buckets + #bucket_arn: arn:aws:s3:::test-s3-bucket + + # Bucket Name used for polling non-AWS S3 buckets + #non_aws_bucket_name: test-s3-bucket + + # Configures the AWS S3 API to use path style instead of virtual host style (default) + #path_style: false + + # Overrides the `cloud.provider` field for non-AWS S3 buckets. See docs for auto recognized providers. + #provider: minio + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 4105fbd9093..d1d4989ead8 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -5,6 +5,7 @@ package awss3 import ( + "errors" "fmt" "time" @@ -28,12 +29,15 @@ type config struct { MaxNumberOfMessages int `config:"max_number_of_messages"` QueueURL string `config:"queue_url"` BucketARN string `config:"bucket_arn"` + NonAWSBucketName string `config:"non_aws_bucket_name"` BucketListInterval time.Duration `config:"bucket_list_interval"` BucketListPrefix string `config:"bucket_list_prefix"` NumberOfWorkers int `config:"number_of_workers"` AWSConfig awscommon.ConfigAWS `config:",inline"` FileSelectors []fileSelectorConfig `config:"file_selectors"` ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. + PathStyle bool `config:"path_style"` + ProviderOverride string `config:"provider"` } func defaultConfig() config { @@ -46,27 +50,33 @@ func defaultConfig() config { SQSMaxReceiveCount: 5, FIPSEnabled: false, MaxNumberOfMessages: 5, + PathStyle: false, } c.ReaderConfig.InitDefaults() return c } func (c *config) Validate() error { - if c.QueueURL == "" && c.BucketARN == "" { - logp.NewLogger(inputName).Warnf("neither queue_url nor bucket_arn were provided, input %s will stop", inputName) - return nil + configs := []bool{c.QueueURL != "", c.BucketARN != "", c.NonAWSBucketName != ""} + enabled := []bool{} + for i := range configs { + if configs[i] { + enabled = append(enabled, configs[i]) + } } - - if c.QueueURL != "" && c.BucketARN != "" { - return fmt.Errorf("queue_url <%v> and bucket_arn <%v> "+ - "cannot be set at the same time", c.QueueURL, c.BucketARN) + if len(enabled) == 0 { + logp.NewLogger(inputName).Warnf("neither queue_url, bucket_arn, non_aws_bucket_name were provided, input %s will stop", inputName) + return nil + } else if len(enabled) > 1 { + return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, non_aws_bucket_name <%v> "+ + "cannot be set at the same time", c.QueueURL, c.BucketARN, c.NonAWSBucketName) } - if c.BucketARN != "" && c.BucketListInterval <= 0 { + if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 { return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval) } - if c.BucketARN != "" && c.NumberOfWorkers <= 0 { + if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 { return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers) } @@ -90,6 +100,16 @@ func (c *config) Validate() error { c.APITimeout, c.SQSWaitTime) } + if c.FIPSEnabled && c.NonAWSBucketName != "" { + return errors.New("fips_enabled cannot be used with a non-AWS S3 bucket.") + } + if c.PathStyle && c.NonAWSBucketName == "" { + return errors.New("path_style can only be used when polling non-AWS S3 services") + } + if c.ProviderOverride != "" && c.NonAWSBucketName == "" { + return errors.New("provider can only be overriden when polling non-AWS S3 services") + } + return nil } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index f8573d60525..e7ece7a6269 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -22,22 +22,24 @@ import ( func TestConfig(t *testing.T) { const queueURL = "https://example.com" const s3Bucket = "arn:aws:s3:::aBucket" - makeConfig := func(quequeURL, s3Bucket string) config { + const nonAWSS3Bucket = "minio-bucket" + makeConfig := func(quequeURL, s3Bucket string, nonAWSS3Bucket string) config { // Have a separate copy of defaults in the test to make it clear when // anyone changes the defaults. parserConf := parser.Config{} require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom(""))) return config{ - QueueURL: quequeURL, - BucketARN: s3Bucket, - APITimeout: 120 * time.Second, - VisibilityTimeout: 300 * time.Second, - SQSMaxReceiveCount: 5, - SQSWaitTime: 20 * time.Second, - BucketListInterval: 120 * time.Second, - BucketListPrefix: "", - + QueueURL: quequeURL, + BucketARN: s3Bucket, + NonAWSBucketName: nonAWSS3Bucket, + APITimeout: 120 * time.Second, + VisibilityTimeout: 300 * time.Second, + SQSMaxReceiveCount: 5, + SQSWaitTime: 20 * time.Second, + BucketListInterval: 120 * time.Second, + BucketListPrefix: "", FIPSEnabled: false, + PathStyle: false, MaxNumberOfMessages: 5, ReaderConfig: readerConfig{ BufferSize: 16 * humanize.KiByte, @@ -49,17 +51,19 @@ func TestConfig(t *testing.T) { } testCases := []struct { - name string - queueURL string - s3Bucket string - config common.MapStr - expectedErr string - expectedCfg func(queueURL, s3Bucket string) config + name string + queueURL string + s3Bucket string + nonAWSS3Bucket string + config common.MapStr + expectedErr string + expectedCfg func(queueURL, s3Bucket string, nonAWSS3Bucket string) config }{ { "input with defaults for queueURL", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, }, @@ -70,13 +74,14 @@ func TestConfig(t *testing.T) { "input with defaults for s3Bucket", "", s3Bucket, + "", common.MapStr{ "bucket_arn": s3Bucket, "number_of_workers": 5, }, "", - func(queueURL, s3Bucket string) config { - c := makeConfig("", s3Bucket) + func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + c := makeConfig("", s3Bucket, "") c.NumberOfWorkers = 5 return c }, @@ -85,6 +90,7 @@ func TestConfig(t *testing.T) { "input with file_selectors", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, "file_selectors": []common.MapStr{ @@ -94,8 +100,8 @@ func TestConfig(t *testing.T) { }, }, "", - func(queueURL, s3Bucket string) config { - c := makeConfig(queueURL, "") + func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "") regex := match.MustCompile("/CloudTrail/") c.FileSelectors = []fileSelectorConfig{ { @@ -110,30 +116,70 @@ func TestConfig(t *testing.T) { "error on no queueURL and s3Bucket", "", "", + "", common.MapStr{ "queue_url": "", "bucket_arn": "", }, "", - func(queueURL, s3Bucket string) config { - return makeConfig("", "") + func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + return makeConfig("", "", "") }, }, { "error on both queueURL and s3Bucket", queueURL, s3Bucket, + "", common.MapStr{ "queue_url": queueURL, "bucket_arn": s3Bucket, }, - "queue_url and bucket_arn cannot be set at the same time", + "queue_url , bucket_arn , non_aws_bucket_name <> cannot be set at the same time", + nil, + }, + { + "error on both queueURL and NonAWSS3Bucket", + queueURL, + "", + nonAWSS3Bucket, + common.MapStr{ + "queue_url": queueURL, + "non_aws_bucket_name": nonAWSS3Bucket, + }, + "queue_url , bucket_arn <>, non_aws_bucket_name cannot be set at the same time", + nil, + }, + { + "error on both s3Bucket and NonAWSS3Bucket", + "", + s3Bucket, + nonAWSS3Bucket, + common.MapStr{ + "bucket_arn": s3Bucket, + "non_aws_bucket_name": nonAWSS3Bucket, + }, + "queue_url <>, bucket_arn , non_aws_bucket_name cannot be set at the same time", + nil, + }, + { + "error on queueURL, s3Bucket, and NonAWSS3Bucket", + queueURL, + s3Bucket, + nonAWSS3Bucket, + common.MapStr{ + "queue_url": queueURL, + "bucket_arn": s3Bucket, + "non_aws_bucket_name": nonAWSS3Bucket, + }, + "queue_url , bucket_arn , non_aws_bucket_name cannot be set at the same time", nil, }, { "error on api_timeout == 0", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, "api_timeout": "0", @@ -145,6 +191,7 @@ func TestConfig(t *testing.T) { "error on visibility_timeout == 0", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, "visibility_timeout": "0", @@ -156,6 +203,7 @@ func TestConfig(t *testing.T) { "error on visibility_timeout > 12h", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, "visibility_timeout": "12h1ns", @@ -167,6 +215,7 @@ func TestConfig(t *testing.T) { "error on bucket_list_interval == 0", "", s3Bucket, + "", common.MapStr{ "bucket_arn": s3Bucket, "bucket_list_interval": "0", @@ -178,6 +227,7 @@ func TestConfig(t *testing.T) { "error on number_of_workers == 0", "", s3Bucket, + "", common.MapStr{ "bucket_arn": s3Bucket, "number_of_workers": "0", @@ -189,6 +239,7 @@ func TestConfig(t *testing.T) { "error on max_number_of_messages == 0", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, "max_number_of_messages": "0", @@ -200,6 +251,7 @@ func TestConfig(t *testing.T) { "error on buffer_size == 0 ", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, "buffer_size": "0", @@ -211,6 +263,7 @@ func TestConfig(t *testing.T) { "error on max_bytes == 0 ", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, "max_bytes": "0", @@ -222,6 +275,7 @@ func TestConfig(t *testing.T) { "error on expand_event_list_from_field and content_type != application/json ", queueURL, "", + "", common.MapStr{ "queue_url": queueURL, "expand_event_list_from_field": "Records", @@ -234,6 +288,7 @@ func TestConfig(t *testing.T) { "error on expand_event_list_from_field and content_type != application/json ", "", s3Bucket, + "", common.MapStr{ "bucket_arn": s3Bucket, "expand_event_list_from_field": "Records", @@ -242,6 +297,87 @@ func TestConfig(t *testing.T) { "content_type must be `application/json` when expand_event_list_from_field is used", nil, }, + { + "input with defaults for non-AWS S3 Bucket", + "", + "", + nonAWSS3Bucket, + common.MapStr{ + "non_aws_bucket_name": nonAWSS3Bucket, + "number_of_workers": 5, + }, + "", + func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + c := makeConfig("", "", nonAWSS3Bucket) + c.NumberOfWorkers = 5 + return c + }, + }, + { + "error on FIPS with non-AWS S3 Bucket", + "", + "", + nonAWSS3Bucket, + common.MapStr{ + "non_aws_bucket_name": nonAWSS3Bucket, + "number_of_workers": 5, + "fips_enabled": true, + }, + "fips_enabled cannot be used with a non-AWS S3 bucket.", + nil, + }, + { + "error on path_style with AWS native S3 Bucket", + "", + s3Bucket, + "", + common.MapStr{ + "bucket_arn": s3Bucket, + "number_of_workers": 5, + "path_style": true, + }, + "path_style can only be used when polling non-AWS S3 services", + nil, + }, + { + "error on path_style with AWS SQS Queue", + queueURL, + "", + "", + common.MapStr{ + "queue_url": queueURL, + "number_of_workers": 5, + "path_style": true, + }, + "path_style can only be used when polling non-AWS S3 services", + nil, + }, + { + "error on provider with AWS native S3 Bucket", + "", + s3Bucket, + "", + common.MapStr{ + "bucket_arn": s3Bucket, + "number_of_workers": 5, + "provider": "asdf", + }, + "provider can only be overriden when polling non-AWS S3 services", + nil, + }, + { + "error on provider with AWS SQS Queue", + queueURL, + "", + "", + common.MapStr{ + "queue_url": queueURL, + "number_of_workers": 5, + "provider": "asdf", + }, + "provider can only be overriden when polling non-AWS S3 services", + nil, + }, } for _, tc := range testCases { @@ -260,7 +396,7 @@ func TestConfig(t *testing.T) { if tc.expectedCfg == nil { t.Fatal("missing expected config in test case") } - assert.EqualValues(t, tc.expectedCfg(tc.queueURL, tc.s3Bucket), c) + assert.EqualValues(t, tc.expectedCfg(tc.queueURL, tc.s3Bucket, tc.nonAWSS3Bucket), c) }) } } diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 4cd5a2c82b7..67edf3b5919 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -138,7 +138,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } } - if in.config.BucketARN != "" { + if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" { // Create S3 receiver and S3 notification processor. poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states) if err != nil { @@ -198,21 +198,29 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli if in.config.FIPSEnabled { s3ServiceName = "s3-fips" } - + var bucketName string + var bucketID string + if in.config.NonAWSBucketName != "" { + bucketName = in.config.NonAWSBucketName + bucketID = bucketName + } else if in.config.BucketARN != "" { + bucketName = getBucketNameFromARN(in.config.BucketARN) + bucketID = in.config.BucketARN + } s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)) - regionName, err := getRegionForBucketARN(cancelCtx, s3Client, in.config.BucketARN) + regionName, err := getRegionForBucket(cancelCtx, s3Client, bucketName) if err != nil { - return nil, fmt.Errorf("failed to get AWS region for bucket_arn: %w", err) + return nil, fmt.Errorf("failed to get AWS region for bucket: %w", err) } - in.awsConfig.Region = regionName s3Client = s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)) + s3Client.ForcePathStyle = in.config.PathStyle s3API := &awsS3API{ client: s3Client, } - log := ctx.Logger.With("bucket_arn", in.config.BucketARN) + log := ctx.Logger.With("bucket", bucketID) log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers) log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval) log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix) @@ -233,9 +241,10 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli s3EventHandlerFactory, states, persistentStore, - in.config.BucketARN, + bucketID, in.config.BucketListPrefix, in.awsConfig.Region, + getProviderFromDomain(in.config.AWSConfig.Endpoint, in.config.ProviderOverride), in.config.NumberOfWorkers, in.config.BucketListInterval) @@ -258,10 +267,7 @@ func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) { return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}") } -func getRegionForBucketARN(ctx context.Context, s3Client *s3.Client, bucketARN string) (string, error) { - bucketMetadata := strings.Split(bucketARN, ":") - bucketName := bucketMetadata[len(bucketMetadata)-1] - +func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) { req := s3Client.GetBucketLocationRequest(&s3.GetBucketLocationInput{ Bucket: awssdk.String(bucketName), }) @@ -273,3 +279,50 @@ func getRegionForBucketARN(ctx context.Context, s3Client *s3.Client, bucketARN s return string(s3.NormalizeBucketLocation(resp.LocationConstraint)), nil } + +func getBucketNameFromARN(bucketARN string) string { + bucketMetadata := strings.Split(bucketARN, ":") + bucketName := bucketMetadata[len(bucketMetadata)-1] + return bucketName +} + +func getProviderFromDomain(endpoint string, ProviderOverride string) string { + if ProviderOverride != "" { + return ProviderOverride + } + if endpoint == "" { + return "aws" + } + // List of popular S3 SaaS providers + var providers = map[string]string{ + "amazonaws.com": "aws", + "c2s.sgov.gov": "aws", + "c2s.ic.gov": "aws", + "amazonaws.com.cn": "aws", + "backblazeb2.com": "backblaze", + "wasabisys.com": "wasabi", + "digitaloceanspaces.com": "digitalocean", + "dream.io": "dreamhost", + "scw.cloud": "scaleway", + "googleapis.com": "gcp", + "cloud.it": "arubacloud", + "linodeobjects.com": "linode", + "vultrobjects.com": "vultr", + "appdomain.cloud": "ibm", + "aliyuncs.com": "alibaba", + "oraclecloud.com": "oracle", + "exo.io": "exoscale", + "upcloudobjects.com": "upcloud", + "ilandcloud.com": "iland", + "zadarazios.com": "zadara", + } + + parsedEndpoint, _ := url.Parse(endpoint) + domain := parsedEndpoint.Hostname() + for key, provider := range providers { + if strings.HasSuffix(domain, key) { + return provider + } + } + return "unknown" +} diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 9c718c83ac8..aabb86b1a6c 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult } s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors) - s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "key-", "region", numberOfWorkers, time.Second) + s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "key-", "region", "provider", numberOfWorkers, time.Second) ctx, cancel := context.WithCancel(context.Background()) b.Cleanup(cancel) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 74866641e11..0ce1c85f505 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -368,6 +368,11 @@ func drainSQS(t *testing.T, tfConfig terraformOutputData) { t.Logf("Drained %d SQS messages.", deletedCount) } +func TestGetBucketNameFromARN(t *testing.T) { + bucketName := getBucketNameFromARN("arn:aws:s3:::my_corporate_bucket") + assert.Equal("my_corporate_bucket", bucketName) +} + func TestGetRegionForBucketARN(t *testing.T) { logp.TestingSetup() @@ -381,7 +386,7 @@ func TestGetRegionForBucketARN(t *testing.T) { s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig)) - regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName) + regionName, err := getRegionForBucket(context.Background(), s3Client, getBucketNameFromARN(tfConfig.BucketName)) assert.Equal(t, tfConfig.AWSRegion, regionName) } @@ -433,3 +438,10 @@ func TestPaginatorListPrefix(t *testing.T) { assert.Equal(t, expected, objects) } + +func TestGetProviderFromDomain(t *testing.T) { + assert.Equal("aws", getProviderFromDomain("", "")) + assert.Equal("aws", getProviderFromDomain("c2s.ic.gov", "")) + assert.Equal("abc", getProviderFromDomain("abc.com", "abc")) + assert.Equal("xyz", getProviderFromDomain("oraclecloud.com", "xyz")) +} diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index d69c9eccb44..1688ca7ebc8 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -7,7 +7,6 @@ package awss3 import ( "context" "net/url" - "strings" "sync" "time" @@ -43,6 +42,7 @@ type s3Poller struct { bucket string listPrefix string region string + provider string bucketPollInterval time.Duration workerSem *sem s3 s3API @@ -64,6 +64,7 @@ func newS3Poller(log *logp.Logger, bucket string, listPrefix string, awsRegion string, + provider string, numberOfWorkers int, bucketPollInterval time.Duration) *s3Poller { if metrics == nil { @@ -74,6 +75,7 @@ func newS3Poller(log *logp.Logger, bucket: bucket, listPrefix: listPrefix, region: awsRegion, + provider: provider, bucketPollInterval: bucketPollInterval, workerSem: newSem(numberOfWorkers), s3: s3, @@ -142,8 +144,7 @@ func (p *s3Poller) ProcessObject(s3ObjectPayloadChan <-chan *s3ObjectPayload) er func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- *s3ObjectPayload) { defer close(s3ObjectPayloadChan) - bucketMetadata := strings.Split(p.bucket, ":") - bucketName := bucketMetadata[len(bucketMetadata)-1] + bucketName := getBucketNameFromARN(p.bucket) paginator := p.s3.ListObjectsPaginator(bucketName, p.listPrefix) for paginator.Next(ctx) { @@ -185,6 +186,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- event := s3EventV2{} event.AWSRegion = p.region + event.Provider = p.provider event.S3.Bucket.Name = bucketName event.S3.Bucket.ARN = p.bucket event.S3.Object.Key = filename diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 3b550a08997..2839c31e225 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -105,6 +105,7 @@ type s3ObjectProcessor struct { readerConfig *readerConfig // Config about how to process the object. s3Obj s3EventV2 // S3 object information. s3ObjHash string + s3RequestURL string s3Metadata map[string]interface{} // S3 object metadata. } @@ -169,6 +170,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { // close the returned reader. func (p *s3ObjectProcessor) download() (contentType string, metadata map[string]interface{}, body io.ReadCloser, err error) { resp, err := p.s3.GetObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) + if err != nil { return "", nil, nil, err } @@ -176,6 +178,7 @@ func (p *s3ObjectProcessor) download() (contentType string, metadata map[string] if resp == nil { return "", nil, nil, errors.New("empty response from s3 get object") } + p.s3RequestURL = resp.SDKResponseMetdata().Request.HTTPRequest.URL.String() meta := s3Metadata(resp, p.readerConfig.IncludeS3Metadata...) if resp.ContentType == nil { @@ -218,7 +221,7 @@ func (p *s3ObjectProcessor) readJSON(r io.Reader) error { } data, _ := item.MarshalJSON() - evt := createEvent(string(data), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) + evt := p.createEvent(string(data), offset) p.publish(p.acker, &evt) } @@ -257,7 +260,8 @@ func (p *s3ObjectProcessor) splitEventList(key string, raw json.RawMessage, offs } data, _ := item.MarshalJSON() - evt := createEvent(string(data), offset+arrayOffset, p.s3Obj, objHash, p.s3Metadata) + p.s3ObjHash = objHash + evt := p.createEvent(string(data), offset+arrayOffset) p.publish(p.acker, &evt) } @@ -301,7 +305,7 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { return fmt.Errorf("error reading message: %w", err) } - event := createEvent(string(message.Content), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) + event := p.createEvent(string(message.Content), offset) event.Fields.DeepUpdate(message.Fields) offset += int64(message.Bytes) p.publish(p.acker, &event) @@ -317,7 +321,7 @@ func (p *s3ObjectProcessor) publish(ack *eventACKTracker, event *beat.Event) { p.publisher.Publish(*event) } -func createEvent(message string, offset int64, obj s3EventV2, objectHash string, meta map[string]interface{}) beat.Event { +func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event { event := beat.Event{ Timestamp: time.Now().UTC(), Fields: common.MapStr{ @@ -325,29 +329,29 @@ func createEvent(message string, offset int64, obj s3EventV2, objectHash string, "log": common.MapStr{ "offset": offset, "file": common.MapStr{ - "path": constructObjectURL(obj), + "path": p.s3RequestURL, }, }, "aws": common.MapStr{ "s3": common.MapStr{ "bucket": common.MapStr{ - "name": obj.S3.Bucket.Name, - "arn": obj.S3.Bucket.ARN}, + "name": p.s3Obj.S3.Bucket.Name, + "arn": p.s3Obj.S3.Bucket.ARN}, "object": common.MapStr{ - "key": obj.S3.Object.Key, + "key": p.s3Obj.S3.Object.Key, }, }, }, "cloud": common.MapStr{ - "provider": "aws", - "region": obj.AWSRegion, + "provider": p.s3Obj.Provider, + "region": p.s3Obj.AWSRegion, }, }, } - event.SetID(objectID(objectHash, offset)) + event.SetID(objectID(p.s3ObjHash, offset)) - if len(meta) > 0 { - event.Fields.Put("aws.s3.metadata", meta) + if len(p.s3Metadata) > 0 { + event.Fields.Put("aws.s3.metadata", p.s3Metadata) } return event @@ -357,10 +361,6 @@ func objectID(objectHash string, offset int64) string { return fmt.Sprintf("%s-%012d", objectHash, offset) } -func constructObjectURL(obj s3EventV2) string { - return "https://" + obj.S3.Bucket.Name + ".s3." + obj.AWSRegion + ".amazonaws.com/" + obj.S3.Object.Key -} - // s3ObjectHash returns a short sha256 hash of the bucket arn + object key name. func s3ObjectHash(obj s3EventV2) string { h := sha256.New() diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 04b6848f1ee..952fbb757dc 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -9,6 +9,8 @@ import ( "context" "errors" "io/ioutil" + "net/http" + "net/url" "path/filepath" "strings" "testing" @@ -36,13 +38,26 @@ func newS3Object(t testing.TB, filename, contentType string) (s3EventV2, *s3.Get func newS3GetObjectResponse(filename string, data []byte, contentType string) *s3.GetObjectResponse { r := bytes.NewReader(data) contentLen := int64(r.Len()) - resp := &s3.GetObjectResponse{ - GetObjectOutput: &s3.GetObjectOutput{ - Body: ioutil.NopCloser(r), - ContentLength: &contentLen, + + req := &s3.GetObjectRequest{ + Request: &awssdk.Request{ + HTTPRequest: &http.Request{ + URL: &url.URL{Path: filename}, + }, + Retryer: awssdk.NoOpRetryer{}, + Data: &s3.GetObjectOutput{ + Body: ioutil.NopCloser(r), + ContentLength: &contentLen, + }, + }, + Input: &s3.GetObjectInput{ + Bucket: awssdk.String("dummy_bucket"), + Key: awssdk.String(filename), }, } + resp, _ := req.Send(context.Background()) + if contentType != "" { resp.ContentType = &contentType } diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 250730ab055..b41349c1c8b 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -133,7 +133,7 @@ func TestS3Poller(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) - receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", numberOfWorkers, pollInterval) + receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.available) }) @@ -263,7 +263,7 @@ func TestS3Poller(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) - receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", numberOfWorkers, pollInterval) + receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.available) }) diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index aba2c777739..b2c1a7f169d 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -60,6 +60,7 @@ type s3EventsV2 struct { // s3EventV2 is a S3 change notification event. type s3EventV2 struct { AWSRegion string `json:"awsRegion"` + Provider string `json:"provider"` EventName string `json:"eventName"` EventSource string `json:"eventSource"` S3 struct { diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index 036ab3c13dc..4940b4a6eca 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -131,6 +131,7 @@ func newS3Event(key string) s3EventV2 { AWSRegion: "us-east-1", EventSource: "aws:s3", EventName: "ObjectCreated:Put", + Provider: "aws", } record.S3.Bucket.Name = "foo" record.S3.Bucket.ARN = "arn:aws:s3:::foo" diff --git a/x-pack/libbeat/common/aws/credentials.go b/x-pack/libbeat/common/aws/credentials.go index deef1196916..8a24123c724 100644 --- a/x-pack/libbeat/common/aws/credentials.go +++ b/x-pack/libbeat/common/aws/credentials.go @@ -138,10 +138,15 @@ func getRoleArn(config ConfigAWS, awsConfig awssdk.Config) awssdk.Config { // service clients when endpoint is given in config. func EnrichAWSConfigWithEndpoint(endpoint string, serviceName string, regionName string, awsConfig awssdk.Config) awssdk.Config { if endpoint != "" { - if regionName == "" { - awsConfig.EndpointResolver = awssdk.ResolveWithEndpointURL("https://" + serviceName + "." + endpoint) + parsedEndpoint, _ := url.Parse(endpoint) + if parsedEndpoint.Scheme != "" { + awsConfig.EndpointResolver = awssdk.ResolveWithEndpointURL(endpoint) } else { - awsConfig.EndpointResolver = awssdk.ResolveWithEndpointURL("https://" + serviceName + "." + regionName + "." + endpoint) + if regionName == "" { + awsConfig.EndpointResolver = awssdk.ResolveWithEndpointURL("https://" + serviceName + "." + endpoint) + } else { + awsConfig.EndpointResolver = awssdk.ResolveWithEndpointURL("https://" + serviceName + "." + regionName + "." + endpoint) + } } } return awsConfig diff --git a/x-pack/libbeat/common/aws/credentials_test.go b/x-pack/libbeat/common/aws/credentials_test.go index 2d9d6e9664c..dde6a0665e2 100644 --- a/x-pack/libbeat/common/aws/credentials_test.go +++ b/x-pack/libbeat/common/aws/credentials_test.go @@ -58,6 +58,26 @@ func TestEnrichAWSConfigWithEndpoint(t *testing.T) { EndpointResolver: awssdk.ResolveWithEndpointURL("https://cloudwatch.us-west-1.amazonaws.com"), }, }, + { + "full URI endpoint", + "https://s3.test.com:9000", + "s3", + "", + awssdk.Config{}, + awssdk.Config{ + EndpointResolver: awssdk.ResolveWithEndpointURL("https://s3.test.com:9000"), + }, + }, + { + "full non HTTPS URI endpoint", + "http://testobjects.com:9000", + "s3", + "", + awssdk.Config{}, + awssdk.Config{ + EndpointResolver: awssdk.ResolveWithEndpointURL("http://testobjects.com:9000"), + }, + }, } for _, c := range cases { t.Run(c.title, func(t *testing.T) {