Skip to content

Commit

Permalink
some more work
Browse files Browse the repository at this point in the history
  • Loading branch information
lewis262626 committed Feb 27, 2023
1 parent b6c1f81 commit 401fb52
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 29 deletions.
3 changes: 1 addition & 2 deletions receiver/awscloudwatchmetricsreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsclo
go 1.19

require (
github.com/aws/aws-sdk-go v1.44.210
github.com/aws/aws-sdk-go-v2 v1.17.5
github.com/aws/aws-sdk-go-v2/config v1.8.3
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.25.3
github.com/stretchr/testify v1.8.1
Expand All @@ -15,7 +15,6 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.4.3 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.29 // indirect
Expand Down
17 changes: 0 additions & 17 deletions receiver/awscloudwatchmetricsreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 64 additions & 10 deletions receiver/awscloudwatchmetricsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ package awscloudwatchmetricsreceiver // import "github.com/open-telemetry/opente

import (
"context"
"math"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
)

type metricReceiver struct {
Expand All @@ -46,7 +49,7 @@ type metricReceiver struct {
}

type client interface {
GetMetricDataPagesWithContext(ctx aws.Context, input *cloudwatch.GetMetricDataInput, fn func(*cloudwatch.GetMetricDataOutput, bool) bool, opts ...request.Option) error
GetMetricData(ctx context.Context, params *cloudwatch.GetMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.GetMetricDataOutput, error)
}

type metricsRequest interface {
Expand All @@ -59,16 +62,31 @@ type namedRequest struct {
MetricName string
Period time.Duration
AwsAggregation string
Dimensions []*namedRequestDimensions
Dimensions []types.Dimension
}

type namedRequestDimensions struct {
Name string
Value string
}
func (nr *namedRequest) request(st, et *time.Time, rqid string) *cloudwatch.GetMetricDataInput {
getMetricInput := &cloudwatch.GetMetricDataInput{
EndTime: et,
StartTime: st,
MetricDataQueries: []types.MetricDataQuery{
types.MetricDataQuery{
Id: aws.String(rqid),
MetricStat: &types.MetricStat{
Metric: &types.Metric{
Namespace: &nr.Namespace,
MetricName: &nr.MetricName,
Dimensions: nr.Dimensions,
},
Period: aws.Int32(int32(math.Abs(nr.Period.Seconds()))),
Stat: &nr.AwsAggregation,
},
ReturnData: aws.Bool(true),
},
},
}

func (nr *namedRequest) request(st, et *time.Time, rqid string) *cloudwatch.GetMetricDataOutput {
return nil
return getMetricInput
}

func (m *metricReceiver) configureAWSClient() error {
Expand Down Expand Up @@ -127,8 +145,44 @@ func (m *metricReceiver) poll(ctx context.Context) error {
}
}
m.nextStartTime = endTime
return errs
}

func (m *metricReceiver) pollForMetrics(ctx context.Context, r namedRequest, startTime time.Time, endTime time.Time) error {
select {
case _, ok := <-m.doneChan:
if !ok {
return nil
}
default:
filter := r.request(&startTime, &endTime, "test")
paginator := cloudwatch.NewGetMetricDataPaginator(m.client, filter)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
m.logger.Error("unable to retrive metric data from cloudwatch", zap.String("metric name", r.MetricName), zap.Error(err))
break
}
observedTime := pcommon.NewTimestampFromTime(time.Now())
metrics := m.parseMetrics(observedTime, r.Namespace, r.MetricName, output)
if metrics.MetricCount() > 0 {
if err = m.consumer.ConsumeMetrics(ctx, metrics); err != nil {
m.logger.Error("unable to consume logs", zap.Error(err))
break
}
}
}
}
return nil
}

func (m *metricReceiver) parseMetrics(observedTime pcommon.Timestamp, namespace, metricName string, output *cloudwatch.GetMetricDataOutput) pmetric.Metrics {
metrics := pmetric.NewMetrics()
for _, metric := range output.MetricDataResults {
if len(metric.Timestamps) < 1 {
m.logger.Error("no timestamps received from cloudwatch")
return metrics
}
}
return metrics
}

0 comments on commit 401fb52

Please sign in to comment.