Skip to content

Commit

Permalink
Support running multiple log groups in cloudwatch input (#29695)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyan-sheng authored Jan 25, 2022
1 parent a84302d commit ea8f10c
Show file tree
Hide file tree
Showing 33 changed files with 907 additions and 311 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Undo deletion of endpoint config from cloudtrail fileset in {pull}29415[29415]. {pull}29450[29450]
- Make Cisco ASA and FTD modules conform to the ECS definition for event.outcome and event.type. {issue}29581[29581] {pull}29698[29698]
- ibmmq: Fixed `@timestamp` not being populated with correct values. {pull}29773[29773]
- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695]
- aws-s3: Improve gzip detection to avoid false negatives. {issue}29968[29968]

*Heartbeat*
Expand Down
8 changes: 7 additions & 1 deletion x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@ log_group_name is given.
==== `log_group_name_prefix`
The prefix for a group of log group names. Note: `region_name` is required when
log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix`
cannot be given at the same time.
cannot be given at the same time. The number of workers that will process the
log groups under this prefix is set through the `number_of_workers` config.

[float]
==== `region_name`
Region that the specified log group or log group prefix belongs to.

[float]
==== `number_of_workers`
Number of workers that will process the log groups with the given `log_group_name_prefix`.
Default value is 1.

[float]
==== `log_streams`
A list of strings of log streams names that Filebeat collect log events from.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
terraform/
outputs.yml
*.tfstate*

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Terraform setup for AWS CloudWatch Input Integration Tests

This directory contains a Terraform module that creates the AWS resources needed
for executing the integration tests for the `aws-cloudwatch` Filebeat input. It
creates two CloudWatch log groups, and one log stream under each log group.

It outputs configuration information that is consumed by the tests to
`outputs.yml`. The AWS resources are randomly named to prevent name collisions
between multiple users.

### Usage

You must have the appropriate AWS environment variables for authentication set
before running Terraform or the integration tests. The AWS key must be
authorized to create and destroy AWS CloudWatch log groups.

1. Initialize a working directory containing Terraform configuration files.

`terraform init`

2. Execute terraform in this directory to create the resources. This will also
write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order
to match the AWS region of the profile you are using.

`terraform apply`


2. (Optional) View the output configuration.

```yaml
"aws_region": "us-east-1"
"log_group_name_1": "filebeat-cloudwatch-integtest-1-417koa"
"log_group_name_2": "filebeat-cloudwatch-integtest-2-417koa"
```
3. Execute the integration test.
```
cd x-pack/filebeat/input/awss3
go test -tags aws,integration -run TestInputRun.+ -v .
```

4. Cleanup AWS resources. Execute terraform to delete the log groups created for
testing.

`terraform destroy`
44 changes: 44 additions & 0 deletions x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 3.52"
}
}
}

provider "aws" {
region = var.aws_region
}

resource "random_string" "random" {
length = 6
special = false
upper = false
}

resource "aws_cloudwatch_log_group" "filebeat-integtest-1" {
name = "filebeat-log-group-integtest-1-${random_string.random.result}"

tags = {
Environment = "test"
}
}

resource "aws_cloudwatch_log_group" "filebeat-integtest-2" {
name = "filebeat-log-group-integtest-2-${random_string.random.result}"

tags = {
Environment = "test"
}
}

resource "aws_cloudwatch_log_stream" "filebeat-integtest-1" {
name = "filebeat-log-stream-integtest-1-${random_string.random.result}"
log_group_name = aws_cloudwatch_log_group.filebeat-integtest-1.name
}

resource "aws_cloudwatch_log_stream" "filebeat-integtest-2" {
name = "filebeat-log-stream-integtest-2-${random_string.random.result}"
log_group_name = aws_cloudwatch_log_group.filebeat-integtest-2.name
}
11 changes: 11 additions & 0 deletions x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
resource "local_file" "secrets" {
content = yamlencode({
"log_group_name_1" : aws_cloudwatch_log_group.filebeat-integtest-1.name
"log_group_name_2" : aws_cloudwatch_log_group.filebeat-integtest-2.name
"log_stream_name_1" : aws_cloudwatch_log_stream.filebeat-integtest-1.name
"log_stream_name_2" : aws_cloudwatch_log_stream.filebeat-integtest-2.name
"aws_region" : var.aws_region
})
filename = "${path.module}/outputs.yml"
file_permission = "0644"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
variable "aws_region" {
description = "AWS Region"
type = string
default = "us-east-1"
}
126 changes: 126 additions & 0 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awscloudwatch

import (
"context"
"sync"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/statestore"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
)

type cloudwatchPoller struct {
numberOfWorkers int
apiSleep time.Duration
region string
logStreams []string
logStreamPrefix string
startTime int64
endTime int64
prevEndTime int64
workerSem *awscommon.Sem
log *logp.Logger
metrics *inputMetrics
store *statestore.Store
workersListingMap *sync.Map
workersProcessingMap *sync.Map
}

func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
store *statestore.Store,
awsRegion string, apiSleep time.Duration,
numberOfWorkers int, logStreams []string, logStreamPrefix string) *cloudwatchPoller {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
}

return &cloudwatchPoller{
numberOfWorkers: numberOfWorkers,
apiSleep: apiSleep,
region: awsRegion,
logStreams: logStreams,
logStreamPrefix: logStreamPrefix,
startTime: int64(0),
endTime: int64(0),
workerSem: awscommon.NewSem(numberOfWorkers),
log: log,
metrics: metrics,
store: store,
workersListingMap: new(sync.Map),
workersProcessingMap: new(sync.Map),
}
}

func (p *cloudwatchPoller) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
if err != nil {
var err *awssdk.RequestCanceledError
if errors.As(err, &err) {
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err)
}
p.log.Error("getLogEventsFromCloudWatch failed: ", err)
}
}

// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error {
// construct FilterLogEventsInput
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)

// make API request
req := svc.FilterLogEventsRequest(filterLogEventsInput)
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req)
for paginator.Next(context.TODO()) {
page := paginator.CurrentPage()
p.metrics.apiCallsTotal.Inc()

logEvents := page.Events
p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents)))

// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep)
time.Sleep(p.apiSleep)
p.log.Debug("done sleeping")

p.log.Debugf("Processing #%v events", len(logEvents))
err := logProcessor.processLogEvents(logEvents, logGroup, p.region)
if err != nil {
err = errors.Wrap(err, "processLogEvents failed")
p.log.Error(err)
}
}

if err := paginator.Err(); err != nil {
return errors.Wrap(err, "error FilterLogEvents with Paginator")
}
return nil
}

func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String(logGroup),
StartTime: awssdk.Int64(startTime),
EndTime: awssdk.Int64(endTime),
Limit: awssdk.Int64(100),
}

if len(p.logStreams) > 0 {
filterLogEventsInput.LogStreamNames = p.logStreams
}

if p.logStreamPrefix != "" {
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix)
}
return filterLogEventsInput
}
12 changes: 7 additions & 5 deletions x-pack/filebeat/input/awscloudwatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@ type config struct {
APITimeout time.Duration `config:"api_timeout" validate:"min=0,nonzero"`
APISleep time.Duration `config:"api_sleep" validate:"min=0,nonzero"`
Latency time.Duration `config:"latency"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
}

func defaultConfig() config {
return config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "aws-cloudwatch",
},
StartPosition: "beginning",
ScanFrequency: 10 * time.Second,
APITimeout: 120 * time.Second,
APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms
StartPosition: "beginning",
ScanFrequency: 10 * time.Second,
APITimeout: 120 * time.Second,
APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms
NumberOfWorkers: 1,
}
}

Expand Down
Loading

0 comments on commit ea8f10c

Please sign in to comment.