Skip to content

Commit

Permalink
[Metricbeat] add collecting tags for rds metricset (#16605)
Browse files Browse the repository at this point in the history
* add ListTagsForResourceRequest for rds metricset
  • Loading branch information
kaiyan-sheng authored Mar 10, 2020
1 parent 1516064 commit df3eb93
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add filtering option for prometheus collector. {pull}16420[16420]
- Add metricsets based on Ceph Manager Daemon to the `ceph` module. {issue}7723[7723] {pull}16254[16254]
- Release `statsd` module as GA. {pull}16447[16447] {issue}14280[14280]
- Add collecting tags and tags_filter for rds metricset in aws module. {pull}16605[16605] {issue}16358[16358]
- Add OpenMetrics Metricbeat module {pull}16596[16596]
- Add `cloudfoundry` module to send events from Cloud Foundry. {pull}16671[16671]
- Add `redisenterprise` module. {pull}16482[16482] {issue}15269[15269]
Expand Down
5 changes: 4 additions & 1 deletion metricbeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Currently, we have `billing`, `cloudwatch`, `dynamodb`, `ebs`, `ec2`, `elb`,
`lambda`, `rds`, `s3_daily_storage`, `s3_request`, `sns`, `sqs` and `usage`
metricset in `aws` module.

Collecting `tags` for `ec2`, `cloudwatch`, and metricset created based on
Collecting `tags` for `ec2`, `rds`, `cloudwatch`, and metricset created based on
`cloudwatch` using light module is supported.

* *tags.*: Tag key value pairs from aws resources. A tag is a label that user assigns to an AWS resource.
Expand Down Expand Up @@ -301,6 +301,9 @@ metricbeat.modules:
- module: aws
period: 60s
credential_profile_name: test-mb
tags_filter:
- key: "dept"
value: "eng"
metricsets:
- rds
----
Expand Down
3 changes: 3 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ metricbeat.modules:
- module: aws
period: 60s
credential_profile_name: test-mb
tags_filter:
- key: "dept"
value: "eng"
metricsets:
- rds

Expand Down
3 changes: 3 additions & 0 deletions x-pack/metricbeat/module/aws/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@
- module: aws
period: 60s
credential_profile_name: test-mb
tags_filter:
- key: "dept"
value: "eng"
metricsets:
- rds
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Currently, we have `billing`, `cloudwatch`, `dynamodb`, `ebs`, `ec2`, `elb`,
`lambda`, `rds`, `s3_daily_storage`, `s3_request`, `sns`, `sqs` and `usage`
metricset in `aws` module.

Collecting `tags` for `ec2`, `cloudwatch`, and metricset created based on
Collecting `tags` for `ec2`, `rds`, `cloudwatch`, and metricset created based on
`cloudwatch` using light module is supported.

* *tags.*: Tag key value pairs from aws resources. A tag is a label that user assigns to an AWS resource.
Expand Down
16 changes: 13 additions & 3 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go-v2/service/iam"
"github.com/aws/aws-sdk-go-v2/service/rds"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/pkg/errors"
Expand All @@ -23,9 +24,10 @@ import (

// Config defines all required and optional parameters for aws metricsets
type Config struct {
Period time.Duration `config:"period" validate:"nonzero,required"`
Regions []string `config:"regions"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
Period time.Duration `config:"period" validate:"nonzero,required"`
Regions []string `config:"regions"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
TagsFilter []Tag `config:"tags_filter"`
}

// MetricSet is the base metricset for all aws metricsets
Expand All @@ -37,6 +39,7 @@ type MetricSet struct {
AwsConfig *awssdk.Config
AccountName string
AccountID string
TagsFilter []Tag
}

// Tag holds a configuration specific for ec2 and cloudwatch metricset.
Expand Down Expand Up @@ -84,6 +87,7 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
BaseMetricSet: base,
Period: config.Period,
AwsConfig: &awsConfig,
TagsFilter: config.TagsFilter,
}

// Get IAM account name
Expand Down Expand Up @@ -194,6 +198,12 @@ func CheckTagFiltersExist(tagsFilter []Tag, tags interface{}) bool {
tagKeys = append(tagKeys, *tag.Key)
tagValues = append(tagValues, *tag.Value)
}
case []rds.Tag:
tagsRDS := tags.([]rds.Tag)
for _, tag := range tagsRDS {
tagKeys = append(tagKeys, *tag.Key)
tagValues = append(tagValues, *tag.Value)
}
}

for _, tagFilter := range tagsFilter {
Expand Down
16 changes: 4 additions & 12 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func init() {
// interface methods except for Fetch.
type MetricSet struct {
*aws.MetricSet
TagsFilter []aws.Tag `config:"tags_filter"`
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
Expand All @@ -56,15 +55,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.Wrap(err, "error creating aws metricset")
}

config := struct {
Tags []aws.Tag `config:"tags_filter"`
}{}

err = base.Module().UnpackConfig(&config)
if err != nil {
return nil, errors.Wrap(err, "error unpack raw module config using UnpackConfig")
}

// Check if period is set to be multiple of 60s or 300s
remainder300 := int(metricSet.Period.Seconds()) % 300
remainder60 := int(metricSet.Period.Seconds()) % 60
Expand All @@ -76,8 +66,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

return &MetricSet{
MetricSet: metricSet,
TagsFilter: config.Tags,
MetricSet: metricSet,
}, nil
}

Expand Down Expand Up @@ -201,6 +190,9 @@ func (m *MetricSet) createCloudWatchEvents(getMetricDataResults []cloudwatch.Met
// If tag filter doesn't exist in tagKeys/tagValues,
// then do not report this event/instance.
if exists := aws.CheckTagFiltersExist(m.TagsFilter, tags); !exists {
// if tag filter doesn't exist, remove this event initial
// entry to avoid report an empty event.
delete(events, instanceID)
continue
}
}
Expand Down
140 changes: 139 additions & 1 deletion x-pack/metricbeat/module/aws/ec2/ec2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ func TestCreateCloudWatchEventsDedotTags(t *testing.T) {

metricSet := MetricSet{
&aws.MetricSet{},
nil,
}
events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1")
assert.NoError(t, err)
Expand All @@ -222,6 +221,145 @@ func TestCreateCloudWatchEventsDedotTags(t *testing.T) {
assert.Equal(t, expectedEvent.MetricSetFields["tags"], events[instanceID].ModuleFields["tags"])
}

func TestCreateCloudWatchEventsWithTagsFilter(t *testing.T) {
expectedEvent := mb.Event{
RootFields: common.MapStr{
"cloud": common.MapStr{
"region": regionName,
"provider": "aws",
"instance": common.MapStr{"id": "i-123"},
"machine": common.MapStr{"type": "t2.medium"},
"availability_zone": "us-west-1a",
},
},
MetricSetFields: common.MapStr{
"cpu": common.MapStr{
"total": common.MapStr{"pct": 0.25},
},
"instance": common.MapStr{
"image": common.MapStr{"id": "image-123"},
"core": common.MapStr{"count": int64(1)},
"threads_per_core": int64(1),
"state": common.MapStr{"code": int64(16), "name": "running"},
"monitoring": common.MapStr{"state": "disabled"},
"public": common.MapStr{
"dns_name": "ec2-1-2-3-4.us-west-1.compute.amazonaws.com",
"ip": "1.2.3.4",
},
"private": common.MapStr{
"dns_name": "ip-5-6-7-8.us-west-1.compute.internal",
"ip": "5.6.7.8",
},
},
"tags": common.MapStr{
"app_kubernetes_io/name": "foo",
"helm_sh/chart": "foo-chart",
},
},
}

svcEC2Mock := &MockEC2Client{}
instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2Mock)
assert.NoError(t, err)
assert.Equal(t, 1, len(instanceIDs))
instanceID := instanceIDs[0]
assert.Equal(t, instanceID, instanceID)
timestamp := time.Now()

getMetricDataOutput := []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
}

metricSet := MetricSet{
&aws.MetricSet{
TagsFilter: []aws.Tag{{
Key: "app.kubernetes.io/name",
Value: "foo",
}},
},
}
events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1")

assert.NoError(t, err)
assert.Equal(t, 1, len(events))
assert.Equal(t, expectedEvent.RootFields, events[instanceID].RootFields)
assert.Equal(t, expectedEvent.MetricSetFields["cpu"], events[instanceID].MetricSetFields["cpu"])
assert.Equal(t, expectedEvent.MetricSetFields["instance"], events[instanceID].MetricSetFields["instance"])
assert.Equal(t, expectedEvent.MetricSetFields["tags"], events[instanceID].ModuleFields["tags"])
}

func TestCreateCloudWatchEventsWithNotMatchingTagsFilter(t *testing.T) {
svcEC2Mock := &MockEC2Client{}
instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2Mock)
assert.NoError(t, err)
assert.Equal(t, 1, len(instanceIDs))
instanceID := instanceIDs[0]
assert.Equal(t, instanceID, instanceID)
timestamp := time.Now()

getMetricDataOutput := []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
}

metricSet := MetricSet{
&aws.MetricSet{
TagsFilter: []aws.Tag{{
Key: "app_kubernetes_io/name",
Value: "not_foo",
}},
},
}
events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1")
assert.NoError(t, err)
assert.Equal(t, 0, len(events))
}

func TestConstructMetricQueries(t *testing.T) {
name := "InstanceId"
dim := cloudwatch.Dimension{
Expand Down
52 changes: 31 additions & 21 deletions x-pack/metricbeat/module/aws/rds/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,71 @@
"aws": {
"rds": {
"aurora_bin_log_replica_lag": 0,
"aurora_replica.lag.ms": 19.683,
"aurora_replica.lag_max.ms": 19.651500701904297,
"aurora_replica.lag_min.ms": 19.651500701904297,
"aurora_replica.lag_max.ms": 19.108999252319336,
"aurora_replica.lag_min.ms": 19.108999252319336,
"cache_hit_ratio.buffer": 100,
"cache_hit_ratio.result_set": 0,
"cpu": {
"total": {
"pct": 0.035
"pct": 0.04
}
},
"database_connections": 0,
"db_instance.class": "db.r5.large",
"db_instance": {
"arn": "arn:aws:rds:us-east-1:428152502467:db:database-1-instance-1",
"class": "db.r5.large",
"identifier": "database-1-instance-1",
"status": "available"
},
"db_instance.identifier": "database-1-instance-1",
"deadlocks": 0,
"disk_usage": {
"bin_log.bytes": 0
},
"engine_uptime.sec": 278598.25,
"free_local_storage.bytes": 33112842240,
"freeable_memory.bytes": 4879514624,
"engine_uptime.sec": 2704277,
"free_local_storage.bytes": 31745863680,
"freeable_memory.bytes": 4634234880,
"latency": {
"commit": 2.9322999999999997,
"commit": 5.270933333333333,
"ddl": 0,
"delete": 0,
"dml": 0.08325833333333334,
"insert": 0.08325833333333334,
"select": 0.20890321021571023,
"dml": 0.1624,
"insert": 0.1624,
"select": 0.17333862433862435,
"update": 0
},
"login_failures": 0,
"queries": 7.807070323085375,
"queries": 9.11833836203304,
"throughput": {
"commit": 0.2500020925349523,
"commit": 0.5000916834753039,
"ddl": 0,
"delete": 0,
"dml": 0.2500020925349523,
"insert": 0.2500020925349523,
"network": 1.7498192019524124,
"network_receive": 0.8749096009762062,
"network_transmit": 0.8749096009762062,
"select": 2.8751975752392616,
"dml": 0.5000916834753039,
"insert": 0.5000916834753039,
"network": 1.4,
"network_receive": 0.7,
"network_transmit": 0.7,
"select": 3.150577605894414,
"update": 0
},
"transactions": {
"active": 0,
"blocked": 0
}
},
"tags": {
"created-by": "ks",
"dept": "engr"
}
},
"cloud": {
"account": {
"id": "428152502467",
"name": "elastic-beats"
},
"availability_zone": "us-east-1b",
"provider": "aws",
"region": "eu-west-1"
"region": "us-east-1"
},
"event": {
"dataset": "aws.rds",
Expand Down
Loading

0 comments on commit df3eb93

Please sign in to comment.