Skip to content

Commit

Permalink
[Metricbeat] Use timestamp from CloudWatch for events (elastic#21498)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyan-sheng authored Oct 6, 2020
1 parent 31cf0ac commit b4c273c
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 224 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix remote_write flaky test. {pull}21173[21173]
- Visualization title fixes in aws, azure and googlecloud compute dashboards. {pull}21098[21098]
- Add a switch to the driver definition on SQL module to use pretty names {pull}17378[17378]
- Use timestamp from CloudWatch API when creating events. {pull}21498[21498]

*Packetbeat*

Expand Down
13 changes: 8 additions & 5 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,14 @@ func StringInSlice(str string, list []string) (bool, int) {
}

// InitEvent initialize mb.Event with basic information like service.name, cloud.provider
func InitEvent(regionName string, accountName string, accountID string) mb.Event {
event := mb.Event{}
event.MetricSetFields = common.MapStr{}
event.ModuleFields = common.MapStr{}
event.RootFields = common.MapStr{}
func InitEvent(regionName string, accountName string, accountID string, timestamp time.Time) mb.Event {
event := mb.Event{
Timestamp: timestamp,
MetricSetFields: common.MapStr{},
ModuleFields: common.MapStr{},
RootFields: common.MapStr{},
}

event.RootFields.Put("cloud.provider", "aws")
if regionName != "" {
event.RootFields.Put("cloud.region", regionName)
Expand Down
38 changes: 20 additions & 18 deletions x-pack/metricbeat/module/aws/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,28 @@ func (m *MetricSet) getCloudWatchBillingMetrics(

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(metricDataOutput)
if !timestamp.IsZero() {
for _, output := range metricDataOutput {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if timestamp.IsZero() {
return nil
}

for _, output := range metricDataOutput {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)

event := aws.InitEvent("", m.AccountName, m.AccountID)
event.MetricSetFields.Put(labels[0], output.Values[timestampIdx])
event := aws.InitEvent("", m.AccountName, m.AccountID, timestamp)
event.MetricSetFields.Put(labels[0], output.Values[timestampIdx])

i := 1
for i < len(labels)-1 {
event.MetricSetFields.Put(labels[i], labels[i+1])
i += 2
}
event.Timestamp = endTime
events = append(events, event)
i := 1
for i < len(labels)-1 {
event.MetricSetFields.Put(labels[i], labels[i+1])
i += 2
}
event.Timestamp = endTime
events = append(events, event)
}
}
return events
Expand Down Expand Up @@ -278,7 +280,7 @@ func (m *MetricSet) getCostGroupBy(svcCostExplorer costexploreriface.ClientAPI,
}

func (m *MetricSet) addCostMetrics(metrics map[string]costexplorer.MetricValue, groupDefinition costexplorer.GroupDefinition, startDate string, endDate string) mb.Event {
event := aws.InitEvent("", m.AccountName, m.AccountID)
event := aws.InitEvent("", m.AccountName, m.AccountID, time.Now())

// add group definition
event.MetricSetFields.Put("group_definition", common.MapStr{
Expand Down
105 changes: 52 additions & 53 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,34 +499,35 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(metricDataResults)
if timestamp.IsZero() {
return nil, nil
}

// Create events when there is no tags_filter or resource_type specified.
if len(resourceTypeTagFilters) == 0 {
if !timestamp.IsZero() {
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
continue
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
continue
}

identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)
identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)
}
}
return events, nil
Expand Down Expand Up @@ -556,45 +557,43 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
m.logger.Debugf("In region %s, service %s tags match tags_filter", regionName, identifier)
}

if !timestamp.IsZero() {
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
if len(tagsFilter) != 0 {
continue
}

// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
if len(tagsFilter) != 0 {
continue
}

identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
// when tagsFilter is not empty but no entry in
// resourceTagMap for this identifier, do not initialize
// an event for this identifier.
if len(tagsFilter) != 0 && resourceTagMap[identifierValue] == nil {
continue
}
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
continue
}

// add tags to event based on identifierValue
insertTags(events, identifierValue, resourceTagMap)
identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
// when tagsFilter is not empty but no entry in
// resourceTagMap for this identifier, do not initialize
// an event for this identifier.
if len(tagsFilter) != 0 && resourceTagMap[identifierValue] == nil {
continue
}
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)

// add tags to event based on identifierValue
insertTags(events, identifierValue, resourceTagMap)
}
}
}
Expand Down
39 changes: 32 additions & 7 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
Expand All @@ -22,12 +19,14 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/aws"
)

var (
regionName = "us-west-1"
timestamp = time.Now()
timestamp = time.Date(2020, 10, 06, 00, 00, 00, 0, time.UTC)
accountID = "123456789012"
accountName = "test"

Expand Down Expand Up @@ -1466,9 +1465,9 @@ func TestInsertTags(t *testing.T) {
tagValue3 := "dev"

events := map[string]mb.Event{}
events[identifier1] = aws.InitEvent(regionName, accountName, accountID)
events[identifier2] = aws.InitEvent(regionName, accountName, accountID)
events[identifierContainsArn] = aws.InitEvent(regionName, accountName, accountID)
events[identifier1] = aws.InitEvent(regionName, accountName, accountID, timestamp)
events[identifier2] = aws.InitEvent(regionName, accountName, accountID, timestamp)
events[identifierContainsArn] = aws.InitEvent(regionName, accountName, accountID, timestamp)

resourceTagMap := map[string][]resourcegroupstaggingapi.Tag{}
resourceTagMap["test-s3-1"] = []resourcegroupstaggingapi.Tag{
Expand Down Expand Up @@ -1569,3 +1568,29 @@ func TestConfigDimensionValueContainsWildcard(t *testing.T) {
})
}
}

func TestCreateEventsTimestamp(t *testing.T) {
m := MetricSet{
logger: logp.NewLogger("test"),
CloudwatchConfigs: []Config{{Statistic: []string{"Average"}}},
MetricSet: &aws.MetricSet{Period: 5, AccountID: accountID},
}

listMetricWithStatsTotal := []metricsWithStatistics{
{
cloudwatch.Metric{
MetricName: awssdk.String("CPUUtilization"),
Namespace: awssdk.String("AWS/EC2"),
},
[]string{"Average"},
nil,
},
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)

events, err := m.createEvents(&MockCloudWatchClientWithoutDim{}, &MockResourceGroupsTaggingClient{}, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
assert.Equal(t, timestamp, events[regionName+accountID+namespace].Timestamp)
}
Loading

0 comments on commit b4c273c

Please sign in to comment.