From ea8f10cc8f6a44dfdea2be48826f5719c1bdce95 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 25 Jan 2022 08:15:24 -0700 Subject: [PATCH] Support running multiple log groups in cloudwatch input (#29695) --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-aws-cloudwatch.asciidoc | 8 +- .../awscloudwatch/_meta/terraform/.gitignore | 3 + .../_meta/terraform/.terraform.lock.hcl | 57 +++ .../awscloudwatch/_meta/terraform/README.md | 46 ++ .../awscloudwatch/_meta/terraform/main.tf | 44 ++ .../awscloudwatch/_meta/terraform/outputs.tf | 11 + .../_meta/terraform/variables.tf | 5 + .../input/awscloudwatch/cloudwatch.go | 126 ++++++ x-pack/filebeat/input/awscloudwatch/config.go | 12 +- x-pack/filebeat/input/awscloudwatch/input.go | 393 ++++++++---------- .../awscloudwatch/input_integration_test.go | 232 +++++++++++ .../input/awscloudwatch/input_test.go | 9 - .../filebeat/input/awscloudwatch/metrics.go | 39 ++ .../filebeat/input/awscloudwatch/processor.go | 78 ++++ x-pack/filebeat/input/awss3/input.go | 2 +- .../input/awss3/input_benchmark_test.go | 6 +- .../input/awss3/input_integration_test.go | 6 +- x-pack/filebeat/input/awss3/interfaces.go | 4 +- .../input/awss3/mock_interfaces_test.go | 3 +- x-pack/filebeat/input/awss3/s3.go | 7 +- x-pack/filebeat/input/awss3/s3_objects.go | 12 +- .../filebeat/input/awss3/s3_objects_test.go | 11 +- x-pack/filebeat/input/awss3/s3_test.go | 11 +- x-pack/filebeat/input/awss3/sqs.go | 5 +- x-pack/filebeat/input/awss3/sqs_s3_event.go | 4 +- .../filebeat/input/awss3/sqs_s3_event_test.go | 3 +- x-pack/filebeat/input/awss3/sqs_test.go | 4 +- .../input/default-inputs/inputs_other.go | 2 + .../awss3 => libbeat/common/aws}/acker.go | 34 +- .../common/aws}/acker_test.go | 20 +- .../awss3 => libbeat/common/aws}/semaphore.go | 16 +- .../common/aws}/semaphore_test.go | 4 +- 33 files changed, 907 insertions(+), 311 deletions(-) create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf create mode 100644 x-pack/filebeat/input/awscloudwatch/cloudwatch.go create mode 100644 x-pack/filebeat/input/awscloudwatch/input_integration_test.go create mode 100644 x-pack/filebeat/input/awscloudwatch/metrics.go create mode 100644 x-pack/filebeat/input/awscloudwatch/processor.go rename x-pack/{filebeat/input/awss3 => libbeat/common/aws}/acker.go (75%) rename x-pack/{filebeat/input/awss3 => libbeat/common/aws}/acker_test.go (79%) rename x-pack/{filebeat/input/awss3 => libbeat/common/aws}/semaphore.go (81%) rename x-pack/{filebeat/input/awss3 => libbeat/common/aws}/semaphore_test.go (95%) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 404be94402e..92e3291317e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -111,6 +111,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Undo deletion of endpoint config from cloudtrail fileset in {pull}29415[29415]. {pull}29450[29450] - Make Cisco ASA and FTD modules conform to the ECS definition for event.outcome and event.type. {issue}29581[29581] {pull}29698[29698] - ibmmq: Fixed `@timestamp` not being populated with correct values. {pull}29773[29773] +- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695] - aws-s3: Improve gzip detection to avoid false negatives. {issue}29968[29968] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc index 71f66674387..1df0fb99a2e 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc @@ -50,12 +50,18 @@ log_group_name is given. ==== `log_group_name_prefix` The prefix for a group of log group names. Note: `region_name` is required when log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix` -cannot be given at the same time. +cannot be given at the same time. The number of workers that will process the +log groups under this prefix is set through the `number_of_workers` config. [float] ==== `region_name` Region that the specified log group or log group prefix belongs to. +[float] +==== `number_of_workers` +Number of workers that will process the log groups with the given `log_group_name_prefix`. +Default value is 1. + [float] ==== `log_streams` A list of strings of log streams names that Filebeat collect log events from. diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore new file mode 100644 index 00000000000..0825744a776 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore @@ -0,0 +1,3 @@ +terraform/ +outputs.yml +*.tfstate* diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl new file mode 100644 index 00000000000..7f6381c60af --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl @@ -0,0 +1,57 @@ +# This file is maintained automatically by "terraform init". +# Manual edits may be lost in future updates. + +provider "registry.terraform.io/hashicorp/aws" { + version = "3.70.0" + constraints = "~> 3.52" + hashes = [ + "h1:jn4ImGMZJ9rQdaVSbcCBqUqnhRSpyaM1DivqaNuP+eg=", + "zh:0af710e528e21b930899f0ac295b0ceef8ad7b623dd8f38e92c8ec4bc7af0321", + "zh:4cabcd4519c0aae474d91ae67a8e3a4a8c39c3945c289a9cf7c1409f64409abe", + "zh:58da1a436facb4e4f95cd2870d211ed7bcb8cf721a4a61970aa8da191665f2aa", + "zh:6465339475c1cd3c16a5c8fee61304dcad2c4a27740687d29c6cdc90d2e6423d", + "zh:7a821ed053c355d70ebe33185590953fa5c364c1f3d66fe3f9b4aba3961646b1", + "zh:7c3656cc9cc1739dcb298e7930c9a76ccfce738d2070841d7e6c62fbdae74eef", + "zh:9d9da9e3c60a0c977e156da8590f36a219ae91994bb3df5a1208de2ab3ceeba7", + "zh:a3138817c86bf3e4dca7fd3a92e099cd1bf1d45ee7c7cc9e9773ba04fc3b315a", + "zh:a8603044e935dfb3cb9319a46d26276162c6aea75e02c4827232f9c6029a3182", + "zh:aef9482332bf43d0b73317f5909dec9e95b983c67b10d72e75eacc7c4f37d084", + "zh:fc3f3cad84f2eebe566dd0b65904c934093007323b9b85e73d9dd4535ceeb29d", + ] +} + +provider "registry.terraform.io/hashicorp/local" { + version = "2.1.0" + hashes = [ + "h1:KfieWtVyGWwplSoLIB5usKAUnrIkDQBkWaR5TI+4WYg=", + "zh:0f1ec65101fa35050978d483d6e8916664b7556800348456ff3d09454ac1eae2", + "zh:36e42ac19f5d68467aacf07e6adcf83c7486f2e5b5f4339e9671f68525fc87ab", + "zh:6db9db2a1819e77b1642ec3b5e95042b202aee8151a0256d289f2e141bf3ceb3", + "zh:719dfd97bb9ddce99f7d741260b8ece2682b363735c764cac83303f02386075a", + "zh:7598bb86e0378fd97eaa04638c1a4c75f960f62f69d3662e6d80ffa5a89847fe", + "zh:ad0a188b52517fec9eca393f1e2c9daea362b33ae2eb38a857b6b09949a727c1", + "zh:c46846c8df66a13fee6eff7dc5d528a7f868ae0dcf92d79deaac73cc297ed20c", + "zh:dc1a20a2eec12095d04bf6da5321f535351a594a636912361db20eb2a707ccc4", + "zh:e57ab4771a9d999401f6badd8b018558357d3cbdf3d33cc0c4f83e818ca8e94b", + "zh:ebdcde208072b4b0f8d305ebf2bfdc62c926e0717599dcf8ec2fd8c5845031c3", + "zh:ef34c52b68933bedd0868a13ccfd59ff1c820f299760b3c02e008dc95e2ece91", + ] +} + +provider "registry.terraform.io/hashicorp/random" { + version = "3.1.0" + hashes = [ + "h1:rKYu5ZUbXwrLG1w81k7H3nce/Ys6yAxXhWcbtk36HjY=", + "zh:2bbb3339f0643b5daa07480ef4397bd23a79963cc364cdfbb4e86354cb7725bc", + "zh:3cd456047805bf639fbf2c761b1848880ea703a054f76db51852008b11008626", + "zh:4f251b0eda5bb5e3dc26ea4400dba200018213654b69b4a5f96abee815b4f5ff", + "zh:7011332745ea061e517fe1319bd6c75054a314155cb2c1199a5b01fe1889a7e2", + "zh:738ed82858317ccc246691c8b85995bc125ac3b4143043219bd0437adc56c992", + "zh:7dbe52fac7bb21227acd7529b487511c91f4107db9cc4414f50d04ffc3cab427", + "zh:a3a9251fb15f93e4cfc1789800fc2d7414bbc18944ad4c5c98f466e6477c42bc", + "zh:a543ec1a3a8c20635cf374110bd2f87c07374cf2c50617eee2c669b3ceeeaa9f", + "zh:d9ab41d556a48bd7059f0810cf020500635bfc696c9fc3adab5ea8915c1d886b", + "zh:d9e13427a7d011dbd654e591b0337e6074eef8c3b9bb11b2e39eaaf257044fd7", + "zh:f7605bd1437752114baf601bdf6931debe6dc6bfe3006eb7e9bb9080931dca8a", + ] +} diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md new file mode 100644 index 00000000000..5d9e4707a4a --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md @@ -0,0 +1,46 @@ +# Terraform setup for AWS CloudWatch Input Integration Tests + +This directory contains a Terraform module that creates the AWS resources needed +for executing the integration tests for the `aws-cloudwatch` Filebeat input. It +creates two CloudWatch log groups, and one log stream under each log group. + +It outputs configuration information that is consumed by the tests to +`outputs.yml`. The AWS resources are randomly named to prevent name collisions +between multiple users. + +### Usage + +You must have the appropriate AWS environment variables for authentication set +before running Terraform or the integration tests. The AWS key must be +authorized to create and destroy AWS CloudWatch log groups. + +1. Initialize a working directory containing Terraform configuration files. + + `terraform init` + +2. Execute terraform in this directory to create the resources. This will also + write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order + to match the AWS region of the profile you are using. + + `terraform apply` + + +2. (Optional) View the output configuration. + + ```yaml + "aws_region": "us-east-1" + "log_group_name_1": "filebeat-cloudwatch-integtest-1-417koa" + "log_group_name_2": "filebeat-cloudwatch-integtest-2-417koa" + ``` + +3. Execute the integration test. + + ``` + cd x-pack/filebeat/input/awss3 + go test -tags aws,integration -run TestInputRun.+ -v . + ``` + +4. Cleanup AWS resources. Execute terraform to delete the log groups created for +testing. + + `terraform destroy` diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf new file mode 100644 index 00000000000..bb3b2459302 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf @@ -0,0 +1,44 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 3.52" + } + } +} + +provider "aws" { + region = var.aws_region +} + +resource "random_string" "random" { + length = 6 + special = false + upper = false +} + +resource "aws_cloudwatch_log_group" "filebeat-integtest-1" { + name = "filebeat-log-group-integtest-1-${random_string.random.result}" + + tags = { + Environment = "test" + } +} + +resource "aws_cloudwatch_log_group" "filebeat-integtest-2" { + name = "filebeat-log-group-integtest-2-${random_string.random.result}" + + tags = { + Environment = "test" + } +} + +resource "aws_cloudwatch_log_stream" "filebeat-integtest-1" { + name = "filebeat-log-stream-integtest-1-${random_string.random.result}" + log_group_name = aws_cloudwatch_log_group.filebeat-integtest-1.name +} + +resource "aws_cloudwatch_log_stream" "filebeat-integtest-2" { + name = "filebeat-log-stream-integtest-2-${random_string.random.result}" + log_group_name = aws_cloudwatch_log_group.filebeat-integtest-2.name +} diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf new file mode 100644 index 00000000000..09e0a07e4a9 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf @@ -0,0 +1,11 @@ +resource "local_file" "secrets" { + content = yamlencode({ + "log_group_name_1" : aws_cloudwatch_log_group.filebeat-integtest-1.name + "log_group_name_2" : aws_cloudwatch_log_group.filebeat-integtest-2.name + "log_stream_name_1" : aws_cloudwatch_log_stream.filebeat-integtest-1.name + "log_stream_name_2" : aws_cloudwatch_log_stream.filebeat-integtest-2.name + "aws_region" : var.aws_region + }) + filename = "${path.module}/outputs.yml" + file_permission = "0644" +} diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf new file mode 100644 index 00000000000..2c4fb00786b --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf @@ -0,0 +1,5 @@ +variable "aws_region" { + description = "AWS Region" + type = string + default = "us-east-1" +} diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go new file mode 100644 index 00000000000..29c119117a0 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -0,0 +1,126 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "context" + "sync" + "time" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/libbeat/statestore" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" +) + +type cloudwatchPoller struct { + numberOfWorkers int + apiSleep time.Duration + region string + logStreams []string + logStreamPrefix string + startTime int64 + endTime int64 + prevEndTime int64 + workerSem *awscommon.Sem + log *logp.Logger + metrics *inputMetrics + store *statestore.Store + workersListingMap *sync.Map + workersProcessingMap *sync.Map +} + +func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, + store *statestore.Store, + awsRegion string, apiSleep time.Duration, + numberOfWorkers int, logStreams []string, logStreamPrefix string) *cloudwatchPoller { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + + return &cloudwatchPoller{ + numberOfWorkers: numberOfWorkers, + apiSleep: apiSleep, + region: awsRegion, + logStreams: logStreams, + logStreamPrefix: logStreamPrefix, + startTime: int64(0), + endTime: int64(0), + workerSem: awscommon.NewSem(numberOfWorkers), + log: log, + metrics: metrics, + store: store, + workersListingMap: new(sync.Map), + workersProcessingMap: new(sync.Map), + } +} + +func (p *cloudwatchPoller) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { + err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) + if err != nil { + var err *awssdk.RequestCanceledError + if errors.As(err, &err) { + p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) + } + p.log.Error("getLogEventsFromCloudWatch failed: ", err) + } +} + +// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch +func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { + // construct FilterLogEventsInput + filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) + + // make API request + req := svc.FilterLogEventsRequest(filterLogEventsInput) + paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req) + for paginator.Next(context.TODO()) { + page := paginator.CurrentPage() + p.metrics.apiCallsTotal.Inc() + + logEvents := page.Events + p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) + + // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). + p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) + time.Sleep(p.apiSleep) + p.log.Debug("done sleeping") + + p.log.Debugf("Processing #%v events", len(logEvents)) + err := logProcessor.processLogEvents(logEvents, logGroup, p.region) + if err != nil { + err = errors.Wrap(err, "processLogEvents failed") + p.log.Error(err) + } + } + + if err := paginator.Err(); err != nil { + return errors.Wrap(err, "error FilterLogEvents with Paginator") + } + return nil +} + +func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { + filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ + LogGroupName: awssdk.String(logGroup), + StartTime: awssdk.Int64(startTime), + EndTime: awssdk.Int64(endTime), + Limit: awssdk.Int64(100), + } + + if len(p.logStreams) > 0 { + filterLogEventsInput.LogStreamNames = p.logStreams + } + + if p.logStreamPrefix != "" { + filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix) + } + return filterLogEventsInput +} diff --git a/x-pack/filebeat/input/awscloudwatch/config.go b/x-pack/filebeat/input/awscloudwatch/config.go index 3f04813e78c..0d8a225866c 100644 --- a/x-pack/filebeat/input/awscloudwatch/config.go +++ b/x-pack/filebeat/input/awscloudwatch/config.go @@ -25,7 +25,8 @@ type config struct { APITimeout time.Duration `config:"api_timeout" validate:"min=0,nonzero"` APISleep time.Duration `config:"api_sleep" validate:"min=0,nonzero"` Latency time.Duration `config:"latency"` - AwsConfig awscommon.ConfigAWS `config:",inline"` + NumberOfWorkers int `config:"number_of_workers"` + AWSConfig awscommon.ConfigAWS `config:",inline"` } func defaultConfig() config { @@ -33,10 +34,11 @@ func defaultConfig() config { ForwarderConfig: harvester.ForwarderConfig{ Type: "aws-cloudwatch", }, - StartPosition: "beginning", - ScanFrequency: 10 * time.Second, - APITimeout: 120 * time.Second, - APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms + StartPosition: "beginning", + ScanFrequency: 10 * time.Second, + APITimeout: 120 * time.Second, + APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms + NumberOfWorkers: 1, } } diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 967c8102f03..d11afa77ff5 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -6,6 +6,7 @@ package awscloudwatch import ( "context" + "fmt" "strings" "sync" "time" @@ -16,71 +17,61 @@ import ( "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/pkg/errors" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/filebeat/beater" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + "github.com/elastic/go-concert/unison" ) const ( inputName = "aws-cloudwatch" ) -func init() { - err := input.Register(inputName, NewInput) - if err != nil { - panic(errors.Wrapf(err, "failed to register %v input", inputName)) +func Plugin(store beater.StateStore) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Stable, + Deprecated: false, + Info: "Collect logs from cloudwatch", + Manager: &cloudwatchInputManager{store: store}, } } -// awsCloudWatchInput is a input for AWS CloudWatch logs -type awsCloudWatchInput struct { - config config - awsConfig awssdk.Config - - logger *logp.Logger - outlet channel.Outleter // Output of received aws-cloudwatch logs. - inputCtx *channelContext - - workerOnce sync.Once // Guarantees that the worker goroutine is only started once. - workerWg sync.WaitGroup // Waits on aws-cloudwatch worker goroutine. - stopOnce sync.Once - close chan struct{} - - prevEndTime int64 // track previous endTime for each iteration. +type cloudwatchInputManager struct { + store beater.StateStore } -// channelContext implements context.Context by wrapping a channel -type channelContext struct { - done <-chan struct{} +func (im *cloudwatchInputManager) Init(grp unison.Group, mode v2.Mode) error { + return nil } -func (c *channelContext) Deadline() (time.Time, bool) { return time.Time{}, false } -func (c *channelContext) Done() <-chan struct{} { return c.done } -func (c *channelContext) Err() error { - select { - case <-c.done: - return context.Canceled - default: - return nil +func (im *cloudwatchInputManager) Create(cfg *common.Config) (v2.Input, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, err } + + return newInput(config, im.store) } -func (c *channelContext) Value(key interface{}) interface{} { return nil } -// NewInput creates a new aws-cloudwatch input -func NewInput(cfg *common.Config, connector channel.Connector, context input.Context) (input.Input, error) { - cfgwarn.Beta("aws-clouwatch input type is used") - logger := logp.NewLogger(inputName) +// cloudwatchInput is an input for reading logs from CloudWatch periodically. +type cloudwatchInput struct { + config config + awsConfig awssdk.Config + store beater.StateStore +} - // Extract and validate the input's configuration. - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { - return nil, errors.Wrap(err, "failed unpacking config") +func newInput(config config, store beater.StateStore) (*cloudwatchInput, error) { + cfgwarn.Beta("aws-cloudwatch input type is used") + awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } - logger.Debug("aws-cloudwatch input config = ", config) if config.LogGroupARN != "" { logGroupName, regionName, err := parseARN(config.LogGroupARN) @@ -92,81 +83,150 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con config.RegionName = regionName } - awsConfig, err := awscommon.InitializeAWSConfig(config.AwsConfig) + awsConfig, err = awscommon.InitializeAWSConfig(config.AWSConfig) if err != nil { return nil, errors.Wrap(err, "InitializeAWSConfig failed") } awsConfig.Region = config.RegionName - closeChannel := make(chan struct{}) - in := &awsCloudWatchInput{ - config: config, - awsConfig: awsConfig, - logger: logger, - close: closeChannel, - inputCtx: &channelContext{closeChannel}, - prevEndTime: int64(0), - } + return &cloudwatchInput{ + config: config, + awsConfig: awsConfig, + store: store, + }, nil +} - // Build outlet for events. - in.outlet, err = connector.Connect(cfg) - if err != nil { - return nil, err - } +func (in *cloudwatchInput) Name() string { return inputName } - in.logger.Info("Initialized AWS CloudWatch input.") - return in, nil +func (in *cloudwatchInput) Test(ctx v2.TestContext) error { + return nil } -// Run runs the input -func (in *awsCloudWatchInput) Run() { - // Please see https://docs.aws.amazon.com/general/latest/gr/cwl_region.html for more info on Amazon CloudWatch Logs endpoints. - logsServiceName := awscommon.CreateServiceName("logs", in.config.AwsConfig.FIPSEnabled, in.config.RegionName) - cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AwsConfig.Endpoint, logsServiceName, in.config.RegionName, in.awsConfig) - svc := cloudwatchlogs.New(cwConfig) - - var logGroupNames []string +func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { var err error - if in.config.LogGroupNamePrefix != "" { - logGroupNames, err = in.getLogGroupNames(svc) - if err != nil { - in.logger.Error("getLogGroupNames failed: ", err) - return + + persistentStore, err := in.store.Access() + if err != nil { + return fmt.Errorf("can not access persistent store: %w", err) + } + + defer persistentStore.Close() + + // Wrap input Context's cancellation Done channel a context.Context. This + // goroutine stops with the parent closes the Done channel. + ctx, cancelInputCtx := context.WithCancel(context.Background()) + go func() { + defer cancelInputCtx() + select { + case <-inputContext.Cancelation.Done(): + case <-ctx.Done(): } - } else { - logGroupNames = []string{in.config.LogGroupName} + }() + defer cancelInputCtx() + + // Create client for publishing events and receive notification of their ACKs. + client, err := pipeline.ConnectWith(beat.ClientConfig{ + CloseRef: inputContext.Cancelation, + ACKHandler: awscommon.NewEventACKHandler(), + }) + if err != nil { + return fmt.Errorf("failed to create pipeline client: %w", err) } + defer client.Close() - for _, logGroup := range logGroupNames { - in.config.LogGroupName = logGroup - in.workerOnce.Do(func() { - in.workerWg.Add(1) - go func() { - in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", in.config.LogGroupName) - defer in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", in.config.LogGroupName) - defer in.workerWg.Done() - in.run(svc) - }() - }) + logsServiceName := awscommon.CreateServiceName("logs", in.config.AWSConfig.FIPSEnabled, in.config.RegionName) + cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, logsServiceName, in.config.RegionName, in.awsConfig) + svc := cloudwatchlogs.New(cwConfig) + + logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName) + if err != nil { + return fmt.Errorf("failed to get log group names: %w", err) } + + log := inputContext.Logger + metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() + metrics := newInputMetrics(metricRegistry, inputContext.ID) + cwPoller := newCloudwatchPoller( + log.Named("cloudwatch_poller"), + metrics, + persistentStore, + in.awsConfig.Region, + in.config.APISleep, + in.config.NumberOfWorkers, + in.config.LogStreams, + in.config.LogStreamPrefix) + logProcessor := newLogProcessor(log.Named("log_processor"), metrics, client, ctx) + cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames))) + return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames) } -func (in *awsCloudWatchInput) run(svc cloudwatchlogsiface.ClientAPI) { - for in.inputCtx.Err() == nil { - err := in.getLogEventsFromCloudWatch(svc) +func (in *cloudwatchInput) Receive(svc cloudwatchlogsiface.ClientAPI, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error { + // This loop tries to keep the workers busy as much as possible while + // honoring the number in config opposed to a simpler loop that does one + // listing, sequentially processes every object and then does another listing + start := true + workerWg := new(sync.WaitGroup) + lastLogGroupOffset := 0 + for ctx.Err() == nil { + if start == false { + cwPoller.log.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) + time.Sleep(in.config.ScanFrequency) + cwPoller.log.Debug("done sleeping") + } + start = false + + currentTime := time.Now() + cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.endTime, in.config.ScanFrequency, in.config.Latency) + cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0)) + availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) if err != nil { - var aerr *awssdk.RequestCanceledError - if errors.As(err, &aerr) { - continue - } - in.logger.Error("getLogEventsFromCloudWatch failed: ", err) + break + } + + if availableWorkers == 0 { continue } - in.logger.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) - time.Sleep(in.config.ScanFrequency) - in.logger.Debug("done sleeping") + workerWg.Add(availableWorkers) + logGroupNamesLength := len(logGroupNames) + runningGoroutines := 0 + + for i := lastLogGroupOffset; i < logGroupNamesLength; i++ { + if runningGoroutines >= availableWorkers { + break + } + + runningGoroutines++ + lastLogGroupOffset = i + 1 + if lastLogGroupOffset >= logGroupNamesLength { + // release unused workers + cwPoller.workerSem.Release(availableWorkers - runningGoroutines) + for j := 0; j < availableWorkers-runningGoroutines; j++ { + workerWg.Done() + } + lastLogGroupOffset = 0 + } + + lg := logGroupNames[i] + go func(logGroup string, startTime int64, endTime int64) { + defer func() { + cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) + workerWg.Done() + cwPoller.workerSem.Release(1) + }() + cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) + cwPoller.run(svc, logGroup, startTime, endTime, logProcessor) + }(lg, cwPoller.startTime, cwPoller.endTime) + } + } + + // Wait for all workers to finish. + workerWg.Wait() + if errors.Is(ctx.Err(), context.Canceled) { + // A canceled context is a normal shutdown. + return nil } + return ctx.Err() } func parseARN(logGroupARN string) (string, string, error) { @@ -185,10 +245,14 @@ func parseARN(logGroupARN string) (string, string, error) { } // getLogGroupNames uses DescribeLogGroups API to retrieve all log group names -func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI) ([]string, error) { +func getLogGroupNames(svc cloudwatchlogsiface.ClientAPI, logGroupNamePrefix string, logGroupName string) ([]string, error) { + if logGroupNamePrefix == "" { + return []string{logGroupName}, nil + } + // construct DescribeLogGroupsInput filterLogEventsInput := &cloudwatchlogs.DescribeLogGroupsInput{ - LogGroupNamePrefix: awssdk.String(in.config.LogGroupNamePrefix), + LogGroupNamePrefix: awssdk.String(logGroupNamePrefix), } // make API request @@ -197,75 +261,18 @@ func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI var logGroupNames []string for p.Next(context.TODO()) { page := p.CurrentPage() - in.logger.Debugf("Collecting #%v log group names", len(page.LogGroups)) for _, lg := range page.LogGroups { logGroupNames = append(logGroupNames, *lg.LogGroupName) } } if err := p.Err(); err != nil { - in.logger.Error("failed DescribeLogGroupsRequest: ", err) return logGroupNames, err } return logGroupNames, nil } -// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI) error { - currentTime := time.Now() - startTime, endTime := getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency, in.config.Latency) - in.logger.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(startTime/1000, 0), time.Unix(endTime/1000, 0)) - - // overwrite prevEndTime using new endTime - in.prevEndTime = endTime - - // construct FilterLogEventsInput - filterLogEventsInput := in.constructFilterLogEventsInput(startTime, endTime) - - // make API request - req := svc.FilterLogEventsRequest(filterLogEventsInput) - paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req) - for paginator.Next(context.TODO()) { - page := paginator.CurrentPage() - - logEvents := page.Events - in.logger.Debugf("Processing #%v events", len(logEvents)) - err := in.processLogEvents(logEvents) - if err != nil { - err = errors.Wrap(err, "processLogEvents failed") - in.logger.Error(err) - } - } - - if err := paginator.Err(); err != nil { - return errors.Wrap(err, "error FilterLogEvents with Paginator") - } - - // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). - in.logger.Debugf("sleeping for %v before making FilterLogEvents API call again", in.config.APISleep) - time.Sleep(in.config.APISleep) - in.logger.Debug("done sleeping") - return nil -} - -func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, endTime int64) *cloudwatchlogs.FilterLogEventsInput { - filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: awssdk.String(in.config.LogGroupName), - StartTime: awssdk.Int64(startTime), - EndTime: awssdk.Int64(endTime), - } - - if len(in.config.LogStreams) > 0 { - filterLogEventsInput.LogStreamNames = in.config.LogStreams - } - - if in.config.LogStreamPrefix != "" { - filterLogEventsInput.LogStreamNamePrefix = awssdk.String(in.config.LogStreamPrefix) - } - return filterLogEventsInput -} - -func getStartPosition(startPosition string, currentTime time.Time, prevEndTime int64, scanFrequency time.Duration, latency time.Duration) (startTime int64, endTime int64) { +func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) { if latency != 0 { // add latency if config is not 0 currentTime = currentTime.Add(latency * -1) @@ -273,77 +280,15 @@ func getStartPosition(startPosition string, currentTime time.Time, prevEndTime i switch startPosition { case "beginning": - if prevEndTime != int64(0) { - return prevEndTime, currentTime.UnixNano() / int64(time.Millisecond) + if endTime != int64(0) { + return endTime, currentTime.UnixNano() / int64(time.Millisecond) } return 0, currentTime.UnixNano() / int64(time.Millisecond) case "end": - if prevEndTime != int64(0) { - return prevEndTime, currentTime.UnixNano() / int64(time.Millisecond) + if endTime != int64(0) { + return endTime, currentTime.UnixNano() / int64(time.Millisecond) } return currentTime.Add(-scanFrequency).UnixNano() / int64(time.Millisecond), currentTime.UnixNano() / int64(time.Millisecond) } - return -} - -func (in *awsCloudWatchInput) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent) error { - for _, logEvent := range logEvents { - event := createEvent(logEvent, in.config.LogGroupName, in.config.RegionName) - err := in.forwardEvent(event) - if err != nil { - err = errors.Wrap(err, "forwardEvent failed") - in.logger.Error(err) - return err - } - } - return nil -} - -func createEvent(logEvent cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) beat.Event { - event := beat.Event{ - Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), - Fields: common.MapStr{ - "message": *logEvent.Message, - "log.file.path": logGroup + "/" + *logEvent.LogStreamName, - "event": common.MapStr{ - "id": *logEvent.EventId, - "ingested": time.Now(), - }, - "awscloudwatch": common.MapStr{ - "log_group": logGroup, - "log_stream": *logEvent.LogStreamName, - "ingestion_time": time.Unix(*logEvent.IngestionTime/1000, 0), - }, - "cloud": common.MapStr{ - "provider": "aws", - "region": regionName, - }, - }, - } - event.SetID(*logEvent.EventId) - - return event -} - -func (in *awsCloudWatchInput) forwardEvent(event beat.Event) error { - ok := in.outlet.OnEvent(event) - if !ok { - return errors.New("OnEvent returned false. Stopping input worker") - } - return nil -} - -// Stop stops the aws-cloudwatch input -func (in *awsCloudWatchInput) Stop() { - in.stopOnce.Do(func() { - defer in.outlet.Close() - close(in.close) - in.logger.Info("Stopping aws-cloudwatch input") - }) -} - -// Wait is an alias for Stop. -func (in *awsCloudWatchInput) Wait() { - in.Stop() - in.workerWg.Wait() + return 0, 0 } diff --git a/x-pack/filebeat/input/awscloudwatch/input_integration_test.go b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go new file mode 100644 index 00000000000..633a0ddcada --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go @@ -0,0 +1,232 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// See _meta/terraform/README.md for integration test usage instructions. + +//go:build integration && aws +// +build integration,aws + +package awscloudwatch + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "golang.org/x/sync/errgroup" + "gopkg.in/yaml.v2" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/external" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/filebeat/beater" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" +) + +const ( + inputID = "test_id" + message1 = "test1" + message2 = "test2" + terraformOutputYML = "_meta/terraform/outputs.yml" + logGroupNamePrefix = "filebeat-log-group-integtest-" +) + +var ( + cloudwatchConfig = common.MapStr{ + "start_position": "beginning", + "scan_frequency": 10 * time.Second, + "api_timeout": 120 * time.Second, + "number_of_workers": 1, + } +) + +type terraformOutputData struct { + AWSRegion string `yaml:"aws_region"` + LogGroup1 string `yaml:"log_group_name_1"` + LogGroup2 string `yaml:"log_group_name_2"` + LogStream1 string `yaml:"log_stream_name_1"` + LogStream2 string `yaml:"log_stream_name_2"` +} + +func getTerraformOutputs(t *testing.T) terraformOutputData { + t.Helper() + + ymlData, err := ioutil.ReadFile(terraformOutputYML) + if os.IsNotExist(err) { + t.Skipf("Run 'terraform apply' in %v to setup CloudWatch log groups and log streams for the test.", filepath.Dir(terraformOutputYML)) + } + if err != nil { + t.Fatalf("failed reading terraform output data: %v", err) + } + + var rtn terraformOutputData + dec := yaml.NewDecoder(bytes.NewReader(ymlData)) + dec.SetStrict(true) + if err = dec.Decode(&rtn); err != nil { + t.Fatal(err) + } + + return rtn +} + +func assertMetric(t *testing.T, snapshot common.MapStr, name string, value interface{}) { + n, _ := snapshot.GetValue(inputID + "." + name) + assert.EqualValues(t, value, n, name) +} + +func newV2Context() (v2.Context, func()) { + ctx, cancel := context.WithCancel(context.Background()) + return v2.Context{ + Logger: logp.NewLogger(inputName).With("id", inputID), + ID: inputID, + Cancelation: ctx, + }, cancel +} + +type testInputStore struct { + registry *statestore.Registry +} + +func openTestStatestore() beater.StateStore { + return &testInputStore{ + registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), + } +} + +func (s *testInputStore) Close() { + s.registry.Close() +} + +func (s *testInputStore) Access() (*statestore.Store, error) { + return s.registry.Get("filebeat") +} + +func (s *testInputStore) CleanupInterval() time.Duration { + return 24 * time.Hour +} + +func createInput(t *testing.T, cfg *common.Config) *cloudwatchInput { + inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg) + if err != nil { + t.Fatal(err) + } + + return inputV2.(*cloudwatchInput) +} + +func makeTestConfigWithLogGroupNamePrefix(regionName string) *common.Config { + return common.MustNewConfigFrom(fmt.Sprintf(`--- +log_group_name_prefix: %s +region_name: %s +`, logGroupNamePrefix, regionName)) +} + +func uploadLogMessage(t *testing.T, svc cloudwatchlogsiface.ClientAPI, message string, timestamp int64, logGroupName string, logStreamName string) { + describeLogStreamsInput := cloudwatchlogs.DescribeLogStreamsInput{ + LogGroupName: awssdk.String(logGroupName), + LogStreamNamePrefix: awssdk.String(logStreamName), + } + + reqDescribeLogStreams := svc.DescribeLogStreamsRequest(&describeLogStreamsInput) + resp, err := reqDescribeLogStreams.Send(context.TODO()) + if err != nil { + t.Fatalf("Failed to describe log stream %q in log group %q: %v", logStreamName, logGroupName, err) + } + + if len(resp.LogStreams) != 1 { + t.Fatalf("Describe log stream %q in log group %q should return 1 and only 1 value", logStreamName, logGroupName) + } + + inputLogEvent := cloudwatchlogs.InputLogEvent{ + Message: awssdk.String(message), + Timestamp: awssdk.Int64(timestamp), + } + + reqPutLogEvents := svc.PutLogEventsRequest( + &cloudwatchlogs.PutLogEventsInput{ + LogEvents: []cloudwatchlogs.InputLogEvent{inputLogEvent}, + LogGroupName: awssdk.String(logGroupName), + LogStreamName: awssdk.String(logStreamName), + SequenceToken: resp.LogStreams[0].UploadSequenceToken, + }) + _, err = reqPutLogEvents.Send(context.TODO()) + if err != nil { + t.Fatalf("Failed to upload message %q into log stream %q in log group %q: %v", message, logStreamName, logGroupName, err) + } +} + +func TestInputWithLogGroupNamePrefix(t *testing.T) { + logp.TestingSetup() + + // Terraform is used to set up S3 and SQS and must be executed manually. + tfConfig := getTerraformOutputs(t) + + cfg, err := external.LoadDefaultAWSConfig() + if err != nil { + t.Fatal(err) + } + cfg.Region = tfConfig.AWSRegion + + // upload log messages for testing + svc := cloudwatchlogs.New(cfg) + currentTime := time.Now() + timestamp := currentTime.UnixNano() / int64(time.Millisecond) + + uploadLogMessage(t, svc, message1, timestamp, tfConfig.LogGroup1, tfConfig.LogStream1) + uploadLogMessage(t, svc, message2, timestamp, tfConfig.LogGroup2, tfConfig.LogStream2) + + // sleep for 30 seconds to wait for the log messages to show up + time.Sleep(30 * time.Second) + + cloudwatchInput := createInput(t, makeTestConfigWithLogGroupNamePrefix(tfConfig.AWSRegion)) + inputCtx, cancel := newV2Context() + t.Cleanup(cancel) + time.AfterFunc(30*time.Second, func() { + cancel() + }) + + client := pubtest.NewChanClient(0) + defer close(client.Channel) + go func() { + for event := range client.Channel { + // Fake the ACK handling that's not implemented in pubtest. + event.Private.(*awscommon.EventACKTracker).ACK() + } + }() + + var errGroup errgroup.Group + errGroup.Go(func() error { + pipeline := pubtest.PublisherWithClient(client) + return cloudwatchInput.Run(inputCtx, pipeline) + }) + + if err := errGroup.Wait(); err != nil { + t.Fatal(err) + } + + snap := common.MapStr(monitoring.CollectStructSnapshot( + monitoring.GetNamespace("dataset").GetRegistry(), + monitoring.Full, + false)) + t.Log(snap.StringToPrint()) + + assertMetric(t, snap, "log_events_received_total", 2) + assertMetric(t, snap, "log_groups_total", 2) + assertMetric(t, snap, "cloudwatch_events_created_total", 2) +} diff --git a/x-pack/filebeat/input/awscloudwatch/input_test.go b/x-pack/filebeat/input/awscloudwatch/input_test.go index 7d8b45f7d44..c094a1cddb5 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_test.go @@ -15,7 +15,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" - "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/common" ) @@ -197,11 +196,3 @@ func TestParseARN(t *testing.T) { assert.Equal(t, "us-east-1", regionName) assert.NoError(t, err) } - -func TestNewInputDone(t *testing.T) { - config := common.MapStr{ - "log_group_name": "some-group", - "region_name": "eu-west-1", - } - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) -} diff --git a/x-pack/filebeat/input/awscloudwatch/metrics.go b/x-pack/filebeat/input/awscloudwatch/metrics.go new file mode 100644 index 00000000000..8d53ec5700c --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/metrics.go @@ -0,0 +1,39 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "github.com/elastic/beats/v7/libbeat/monitoring" +) + +type inputMetrics struct { + id string // Input ID. + parent *monitoring.Registry // Parent registry holding this input's ID as a key. + + logEventsReceivedTotal *monitoring.Uint // Number of CloudWatch log events received. + logGroupsTotal *monitoring.Uint // Logs collected from number of CloudWatch log groups. + cloudwatchEventsCreatedTotal *monitoring.Uint // Number of events created from processing logs from CloudWatch. + apiCallsTotal *monitoring.Uint // Number of API calls made total. +} + +// Close removes the metrics from the registry. +func (m *inputMetrics) Close() { + m.parent.Remove(m.id) +} + +func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { + reg := parent.NewRegistry(id) + monitoring.NewString(reg, "input").Set(inputName) + monitoring.NewString(reg, "id").Set(id) + out := &inputMetrics{ + id: id, + parent: reg, + logEventsReceivedTotal: monitoring.NewUint(reg, "log_events_received_total"), + logGroupsTotal: monitoring.NewUint(reg, "log_groups_total"), + cloudwatchEventsCreatedTotal: monitoring.NewUint(reg, "cloudwatch_events_created_total"), + apiCallsTotal: monitoring.NewUint(reg, "api_calls_total"), + } + return out +} diff --git a/x-pack/filebeat/input/awscloudwatch/processor.go b/x-pack/filebeat/input/awscloudwatch/processor.go new file mode 100644 index 00000000000..558e91d5da5 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/processor.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "context" + "time" + + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" +) + +type logProcessor struct { + log *logp.Logger + metrics *inputMetrics + publisher beat.Client + ack *awscommon.EventACKTracker +} + +func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + return &logProcessor{ + log: log, + metrics: metrics, + publisher: publisher, + ack: awscommon.NewEventACKTracker(ctx), + } +} + +func (p *logProcessor) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) error { + for _, logEvent := range logEvents { + event := createEvent(logEvent, logGroup, regionName) + p.publish(p.ack, &event) + } + return nil +} + +func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { + ack.Add() + event.Private = ack + p.metrics.cloudwatchEventsCreatedTotal.Inc() + p.publisher.Publish(*event) +} + +func createEvent(logEvent cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) beat.Event { + event := beat.Event{ + Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), + Fields: common.MapStr{ + "message": *logEvent.Message, + "log.file.path": logGroup + "/" + *logEvent.LogStreamName, + "event": common.MapStr{ + "id": *logEvent.EventId, + "ingested": time.Now(), + }, + "awscloudwatch": common.MapStr{ + "log_group": logGroup, + "log_stream": *logEvent.LogStreamName, + "ingestion_time": time.Unix(*logEvent.IngestionTime/1000, 0), + }, + "cloud": common.MapStr{ + "provider": "aws", + "region": regionName, + }, + }, + } + event.SetID(*logEvent.EventId) + + return event +} diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 8d673cabbac..e1558b552a0 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -111,7 +111,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ CloseRef: inputContext.Cancelation, - ACKHandler: newEventACKHandler(), + ACKHandler: awscommon.NewEventACKHandler(), }) if err != nil { return fmt.Errorf("failed to create pipeline client: %w", err) diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index ecdc1756ce4..ec7068bb733 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -15,7 +15,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/dustin/go-humanize" @@ -29,6 +28,7 @@ import ( pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) const cloudtrailTestFile = "testdata/aws-cloudtrail.json.gz" @@ -172,7 +172,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR go func() { for event := range client.Channel { // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() } }() @@ -259,7 +259,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult s3API := newConstantS3(t) s3API.pagerConstant = newS3PagerConstant() client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) { - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() }) defer close(client.Channel) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index a7f4f651c07..d112e6a4c35 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -211,7 +211,7 @@ func TestInputRunSQS(t *testing.T) { go func() { for event := range client.Channel { // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() } }() @@ -274,7 +274,7 @@ func TestInputRunS3(t *testing.T) { go func() { for event := range client.Channel { // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() } }() @@ -479,7 +479,7 @@ func TestInputRunSNS(t *testing.T) { defer close(client.Channel) go func() { for event := range client.Channel { - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() } }() diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index c777072c6c9..1cd1dbf807b 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -10,6 +10,8 @@ import ( "fmt" "time" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" @@ -79,7 +81,7 @@ type s3ObjectHandlerFactory interface { // Create returns a new s3ObjectHandler that can be used to process the // specified S3 object. If the handler is not configured to process the // given S3 object (based on key name) then it will return nil. - Create(ctx context.Context, log *logp.Logger, acker *eventACKTracker, obj s3EventV2) s3ObjectHandler + Create(ctx context.Context, log *logp.Logger, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler } type s3ObjectHandler interface { diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index 85c11e0fe80..d315258d177 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -18,6 +18,7 @@ import ( gomock "github.com/golang/mock/gomock" logp "github.com/elastic/beats/v7/libbeat/logp" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) // MockSQSAPI is a mock of sqsAPI interface. @@ -451,7 +452,7 @@ func (m *MockS3ObjectHandlerFactory) EXPECT() *MockS3ObjectHandlerFactoryMockRec } // Create mocks base method. -func (m *MockS3ObjectHandlerFactory) Create(ctx context.Context, log *logp.Logger, acker *eventACKTracker, obj s3EventV2) s3ObjectHandler { +func (m *MockS3ObjectHandlerFactory) Create(ctx context.Context, log *logp.Logger, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Create", ctx, log, acker, obj) ret0, _ := ret[0].(s3ObjectHandler) diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 1688ca7ebc8..aa6e7be8012 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/statestore" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/go-concert/timed" ) @@ -44,7 +45,7 @@ type s3Poller struct { region string provider string bucketPollInterval time.Duration - workerSem *sem + workerSem *awscommon.Sem s3 s3API log *logp.Logger metrics *inputMetrics @@ -77,7 +78,7 @@ func newS3Poller(log *logp.Logger, region: awsRegion, provider: provider, bucketPollInterval: bucketPollInterval, - workerSem: newSem(numberOfWorkers), + workerSem: awscommon.NewSem(numberOfWorkers), s3: s3, log: log, metrics: metrics, @@ -191,7 +192,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- event.S3.Bucket.ARN = p.bucket event.S3.Object.Key = filename - acker := newEventACKTracker(ctx) + acker := awscommon.NewEventACKTracker(ctx) s3Processor := p.s3ObjectHandler.Create(ctx, p.log, acker, event) if s3Processor == nil { diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index ebe1a5f0828..f2cad661567 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -19,6 +19,8 @@ import ( "strings" "time" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/pkg/errors" @@ -73,7 +75,7 @@ func (f *s3ObjectProcessorFactory) findReaderConfig(key string) *readerConfig { // Create returns a new s3ObjectProcessor. It returns nil when no file selectors // match the S3 object key. -func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *eventACKTracker, obj s3EventV2) s3ObjectHandler { +func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler { log = log.With( "bucket_arn", obj.S3.Bucket.Name, "object_key", obj.S3.Object.Key) @@ -100,9 +102,9 @@ type s3ObjectProcessor struct { log *logp.Logger ctx context.Context - acker *eventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object). - readerConfig *readerConfig // Config about how to process the object. - s3Obj s3EventV2 // S3 object information. + acker *awscommon.EventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object). + readerConfig *readerConfig // Config about how to process the object. + s3Obj s3EventV2 // S3 object information. s3ObjHash string s3RequestURL string @@ -313,7 +315,7 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { return nil } -func (p *s3ObjectProcessor) publish(ack *eventACKTracker, event *beat.Event) { +func (p *s3ObjectProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { ack.Add() event.Private = ack p.metrics.s3EventsCreatedTotal.Inc() diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 952fbb757dc..4ab3edfaa4b 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) func newS3Object(t testing.TB, filename, contentType string) (s3EventV2, *s3.GetObjectResponse) { @@ -162,7 +163,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) - ack := newEventACKTracker(ctx) + ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.Error(t, err) assert.True(t, errors.Is(err, errFakeConnectivityFailure), "expected errFakeConnectivityFailure error") @@ -184,7 +185,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) - ack := newEventACKTracker(ctx) + ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.Error(t, err) }) @@ -211,7 +212,7 @@ func TestS3ObjectProcessor(t *testing.T) { ) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) - ack := newEventACKTracker(ctx) + ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.NoError(t, err) }) @@ -249,13 +250,13 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int, ) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, selectors) - ack := newEventACKTracker(ctx) + ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() if !expectErr { require.NoError(t, err) assert.Equal(t, numEvents, len(events)) - assert.EqualValues(t, numEvents, ack.pendingACKs) + assert.EqualValues(t, numEvents, ack.PendingACKs) } else { require.Error(t, err) } diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index b41349c1c8b..ef39e085e1f 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -9,18 +9,15 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/beats/v7/libbeat/statestore/storetest" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" ) func TestS3Poller(t *testing.T) { @@ -135,7 +132,7 @@ func TestS3Poller(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) - assert.Equal(t, numberOfWorkers, receiver.workerSem.available) + assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) }) t.Run("retry after Poll error", func(t *testing.T) { @@ -265,6 +262,6 @@ func TestS3Poller(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) - assert.Equal(t, numberOfWorkers, receiver.workerSem.available) + assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) }) } diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 1f13ec010cf..f1fc7588e37 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/go-concert/timed" ) @@ -23,7 +24,7 @@ const ( type sqsReader struct { maxMessagesInflight int - workerSem *sem + workerSem *awscommon.Sem sqs sqsAPI msgHandler sqsProcessor log *logp.Logger @@ -36,7 +37,7 @@ func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessag } return &sqsReader{ maxMessagesInflight: maxMessagesInflight, - workerSem: newSem(maxMessagesInflight), + workerSem: awscommon.NewSem(maxMessagesInflight), sqs: sqs, msgHandler: msgHandler, log: log, diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index d1865aec9cd..c17efd87d53 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -14,6 +14,8 @@ import ( "sync" "time" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + "github.com/aws/aws-sdk-go-v2/aws/awserr" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/pkg/errors" @@ -275,7 +277,7 @@ func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Log defer log.Debug("End processing SQS S3 event notifications.") // Wait for all events to be ACKed before proceeding. - acker := newEventACKTracker(ctx) + acker := awscommon.NewEventACKTracker(ctx) defer acker.Wait() var errs []error diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 6100dbe3119..fddfb3d0e74 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/logp" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/go-concert/timed" ) @@ -104,7 +105,7 @@ func TestSQSS3EventProcessor(t *testing.T) { gomock.InOrder( mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Do(func(ctx context.Context, _ *logp.Logger, _ *eventACKTracker, _ s3EventV2) { + Do(func(ctx context.Context, _ *logp.Logger, _ *awscommon.EventACKTracker, _ s3EventV2) { timed.Wait(ctx, 5*visibilityTimeout) }).Return(mockS3Handler), mockS3Handler.EXPECT().ProcessS3Object().Return(nil), diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index a8b6e7b5f2a..a2414736198 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -70,7 +70,7 @@ func TestSQSReceiver(t *testing.T) { // Execute sqsReader and verify calls/state. receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) require.NoError(t, receiver.Receive(ctx)) - assert.Equal(t, maxMessages, receiver.workerSem.available) + assert.Equal(t, maxMessages, receiver.workerSem.Available()) }) t.Run("retry after ReceiveMessage error", func(t *testing.T) { @@ -103,7 +103,7 @@ func TestSQSReceiver(t *testing.T) { // Execute SQSReceiver and verify calls/state. receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) require.NoError(t, receiver.Receive(ctx)) - assert.Equal(t, maxMessages, receiver.workerSem.available) + assert.Equal(t, maxMessages, receiver.workerSem.Available()) }) } diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index c31106c3baa..5a268fa4758 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -12,6 +12,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" @@ -26,5 +27,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 httpjson.Plugin(log, store), o365audit.Plugin(log, store), awss3.Plugin(store), + awscloudwatch.Plugin(store), } } diff --git a/x-pack/filebeat/input/awss3/acker.go b/x-pack/libbeat/common/aws/acker.go similarity index 75% rename from x-pack/filebeat/input/awss3/acker.go rename to x-pack/libbeat/common/aws/acker.go index db88c23f7d1..347347dde67 100644 --- a/x-pack/filebeat/input/awss3/acker.go +++ b/x-pack/libbeat/common/aws/acker.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package awss3 +package aws import ( "context" @@ -12,40 +12,40 @@ import ( "github.com/elastic/beats/v7/libbeat/common/acker" ) -// eventACKTracker tracks the publishing state of S3 objects. Specifically +// EventACKTracker tracks the publishing state of S3 objects. Specifically // it tracks the number of message acknowledgements that are pending from the // output. It can be used to wait until all ACKs have been received for one or // more S3 objects. -type eventACKTracker struct { +type EventACKTracker struct { sync.Mutex - pendingACKs int64 + PendingACKs int64 ctx context.Context cancel context.CancelFunc } -func newEventACKTracker(ctx context.Context) *eventACKTracker { +func NewEventACKTracker(ctx context.Context) *EventACKTracker { ctx, cancel := context.WithCancel(ctx) - return &eventACKTracker{ctx: ctx, cancel: cancel} + return &EventACKTracker{ctx: ctx, cancel: cancel} } // Add increments the number of pending ACKs. -func (a *eventACKTracker) Add() { +func (a *EventACKTracker) Add() { a.Lock() - a.pendingACKs++ + a.PendingACKs++ a.Unlock() } // ACK decrements the number of pending ACKs. -func (a *eventACKTracker) ACK() { +func (a *EventACKTracker) ACK() { a.Lock() defer a.Unlock() - if a.pendingACKs <= 0 { + if a.PendingACKs <= 0 { panic("misuse detected: negative ACK counter") } - a.pendingACKs-- - if a.pendingACKs == 0 { + a.PendingACKs-- + if a.PendingACKs == 0 { a.cancel() } } @@ -55,11 +55,11 @@ func (a *eventACKTracker) ACK() { // `Add` calls are made. Failing to do so could reset the pendingACKs // property to 0 and would results in Wait returning after additional // calls to `Add` are made without a corresponding `ACK` call. -func (a *eventACKTracker) Wait() { +func (a *EventACKTracker) Wait() { // If there were never any pending ACKs then cancel the context. (This can // happen when a document contains no events or cannot be read due to an error). a.Lock() - if a.pendingACKs == 0 { + if a.PendingACKs == 0 { a.cancel() } a.Unlock() @@ -68,15 +68,15 @@ func (a *eventACKTracker) Wait() { <-a.ctx.Done() } -// newEventACKHandler returns a beat ACKer that can receive callbacks when +// NewEventACKHandler returns a beat ACKer that can receive callbacks when // an event has been ACKed an output. If the event contains a private metadata // pointing to an eventACKTracker then it will invoke the trackers ACK() method // to decrement the number of pending ACKs. -func newEventACKHandler() beat.ACKer { +func NewEventACKHandler() beat.ACKer { return acker.ConnectionOnly( acker.EventPrivateReporter(func(_ int, privates []interface{}) { for _, private := range privates { - if ack, ok := private.(*eventACKTracker); ok { + if ack, ok := private.(*EventACKTracker); ok { ack.ACK() } } diff --git a/x-pack/filebeat/input/awss3/acker_test.go b/x-pack/libbeat/common/aws/acker_test.go similarity index 79% rename from x-pack/filebeat/input/awss3/acker_test.go rename to x-pack/libbeat/common/aws/acker_test.go index a038e8a39e4..3c470f0b922 100644 --- a/x-pack/filebeat/input/awss3/acker_test.go +++ b/x-pack/libbeat/common/aws/acker_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package awss3 +package aws import ( "context" @@ -17,11 +17,11 @@ func TestEventACKTracker(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - acker := newEventACKTracker(ctx) + acker := NewEventACKTracker(ctx) acker.Add() acker.ACK() - assert.EqualValues(t, 0, acker.pendingACKs) + assert.EqualValues(t, 0, acker.PendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } @@ -29,10 +29,10 @@ func TestEventACKTrackerNoACKs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - acker := newEventACKTracker(ctx) + acker := NewEventACKTracker(ctx) acker.Wait() - assert.EqualValues(t, 0, acker.pendingACKs) + assert.EqualValues(t, 0, acker.PendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } @@ -41,15 +41,15 @@ func TestEventACKHandler(t *testing.T) { t.Cleanup(cancel) // Create acker. Add one pending ACK. - acker := newEventACKTracker(ctx) + acker := NewEventACKTracker(ctx) acker.Add() // Create an ACK handler and simulate one ACKed event. - ackHandler := newEventACKHandler() + ackHandler := NewEventACKHandler() ackHandler.AddEvent(beat.Event{Private: acker}, true) ackHandler.ACKEvents(1) - assert.EqualValues(t, 0, acker.pendingACKs) + assert.EqualValues(t, 0, acker.PendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } @@ -58,12 +58,12 @@ func TestEventACKHandlerWait(t *testing.T) { t.Cleanup(cancel) // Create acker. Add one pending ACK. - acker := newEventACKTracker(ctx) + acker := NewEventACKTracker(ctx) acker.Add() acker.ACK() acker.Wait() acker.Add() - assert.EqualValues(t, 1, acker.pendingACKs) + assert.EqualValues(t, 1, acker.PendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } diff --git a/x-pack/filebeat/input/awss3/semaphore.go b/x-pack/libbeat/common/aws/semaphore.go similarity index 81% rename from x-pack/filebeat/input/awss3/semaphore.go rename to x-pack/libbeat/common/aws/semaphore.go index 2a695f4c621..28343bcbd32 100644 --- a/x-pack/filebeat/input/awss3/semaphore.go +++ b/x-pack/libbeat/common/aws/semaphore.go @@ -2,22 +2,22 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package awss3 +package aws import ( "context" "sync" ) -type sem struct { +type Sem struct { mutex *sync.Mutex cond sync.Cond available int } -func newSem(n int) *sem { +func NewSem(n int) *Sem { var m sync.Mutex - return &sem{ + return &Sem{ available: n, mutex: &m, cond: sync.Cond{ @@ -26,7 +26,7 @@ func newSem(n int) *sem { } } -func (s *sem) AcquireContext(n int, ctx context.Context) (int, error) { +func (s *Sem) AcquireContext(n int, ctx context.Context) (int, error) { acquireC := make(chan int, 1) go func() { defer close(acquireC) @@ -41,7 +41,7 @@ func (s *sem) AcquireContext(n int, ctx context.Context) (int, error) { } } -func (s *sem) Acquire(n int) int { +func (s *Sem) Acquire(n int) int { if n <= 0 { return 0 } @@ -63,7 +63,7 @@ func (s *sem) Acquire(n int) int { return n } -func (s *sem) Release(n int) { +func (s *Sem) Release(n int) { if n <= 0 { return } @@ -75,7 +75,7 @@ func (s *sem) Release(n int) { s.cond.Signal() } -func (s *sem) Available() int { +func (s *Sem) Available() int { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/x-pack/filebeat/input/awss3/semaphore_test.go b/x-pack/libbeat/common/aws/semaphore_test.go similarity index 95% rename from x-pack/filebeat/input/awss3/semaphore_test.go rename to x-pack/libbeat/common/aws/semaphore_test.go index d71252ffc78..f91831ef8a0 100644 --- a/x-pack/filebeat/input/awss3/semaphore_test.go +++ b/x-pack/libbeat/common/aws/semaphore_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package awss3 +package aws import ( "sync" @@ -12,7 +12,7 @@ import ( ) func TestSemaphore(t *testing.T) { - s := newSem(5) + s := NewSem(5) assert.Equal(t, s.Acquire(5), 5)