diff --git a/aws/cwatch/service.go b/aws/cwatch/service.go index 5ae07ea..3d318fd 100644 --- a/aws/cwatch/service.go +++ b/aws/cwatch/service.go @@ -16,16 +16,19 @@ import ( type Service struct { Client Client namespace string - deployment types.Dimension + deployment string batcher *syncx.Batcher[types.MetricDatum] } -// NewService creates a new Cloudwatch service with the given credentials and configuration. If deployment is "dev" then -// then instead of a real Cloudwatch client, the service will get a mocked version that just logs metrics. +// NewService creates a new Cloudwatch service with the given credentials and configuration. Some behaviours depend on +// the given deployment value: +// - "test": metrics just logged, Queue(..) sends synchronously +// - "dev": metrics just logged, Queue(..) adds to batcher +// - "*": metrics sent to Cloudwatch, Queue(..) adds to batcher func NewService(accessKey, secretKey, region, namespace, deployment string) (*Service, error) { var client Client - if deployment == "dev" { + if deployment == "dev" || deployment == "test" { client = &DevClient{} } else { cfg, err := awsx.NewConfig(accessKey, secretKey, region) @@ -35,11 +38,7 @@ func NewService(accessKey, secretKey, region, namespace, deployment string) (*Se client = cloudwatch.NewFromConfig(cfg) } - return &Service{ - Client: client, - namespace: namespace, - deployment: types.Dimension{Name: aws.String("Deployment"), Value: aws.String(deployment)}, - }, nil + return &Service{Client: client, namespace: namespace, deployment: deployment}, nil } func (s *Service) StartQueue(wg *sync.WaitGroup, maxAge time.Duration) { @@ -57,14 +56,25 @@ func (s *Service) StopQueue() { s.batcher.Stop() } -func (s *Service) Queue(d types.MetricDatum) { - s.batcher.Queue(d) +func (s *Service) Queue(data ...types.MetricDatum) { + if s.deployment == "test" { + s.Send(context.TODO(), data...) + } else { + for _, d := range data { + s.batcher.Queue(d) + } + } +} + +func (s *Service) Send(ctx context.Context, data ...types.MetricDatum) error { + _, err := s.Client.PutMetricData(ctx, s.prepare(data)) + return err } -func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput { +func (s *Service) prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput { // add deployment as the first dimension to all metrics for i := range data { - data[i].Dimensions = append([]types.Dimension{s.deployment}, data[i].Dimensions...) + data[i].Dimensions = append([]types.Dimension{{Name: aws.String("Deployment"), Value: aws.String(s.deployment)}}, data[i].Dimensions...) } return &cloudwatch.PutMetricDataInput{ @@ -74,8 +84,7 @@ func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInp } func (s *Service) processBatch(batch []types.MetricDatum) { - _, err := s.Client.PutMetricData(context.TODO(), s.Prepare(batch)) - if err != nil { + if err := s.Send(context.TODO(), batch...); err != nil { slog.Error("error sending metric data batch", "error", err, "count", len(batch)) } } diff --git a/aws/cwatch/service_test.go b/aws/cwatch/service_test.go index 6f844aa..083f013 100644 --- a/aws/cwatch/service_test.go +++ b/aws/cwatch/service_test.go @@ -7,51 +7,39 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "github.com/nyaruka/gocommon/aws/cwatch" "github.com/stretchr/testify/assert" ) func TestService(t *testing.T) { - svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "dev") + // create service for test environment + svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "test") assert.NoError(t, err) - assert.Equal(t, &cloudwatch.PutMetricDataInput{ - Namespace: aws.String("Foo"), - MetricData: []types.MetricDatum{ - { - MetricName: aws.String("NumGoats"), - Dimensions: []types.Dimension{ - {Name: aws.String("Deployment"), Value: aws.String("dev")}, - }, - Value: aws.Float64(10), - }, - { - MetricName: aws.String("NumSheep"), - Dimensions: []types.Dimension{ - {Name: aws.String("Deployment"), Value: aws.String("dev")}, - {Name: aws.String("Host"), Value: aws.String("foo1")}, - }, - Value: aws.Float64(20), - }, - }, - }, svc.Prepare([]types.MetricDatum{ - {MetricName: aws.String("NumGoats"), Value: aws.Float64(10)}, - {MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)}, - })) + err = svc.Send(context.Background(), types.MetricDatum{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)}) + assert.NoError(t, err) + assert.Equal(t, 1, svc.Client.(*cwatch.DevClient).CallCount()) + + // check Queue sends synchronously + svc.Queue(types.MetricDatum{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount}) + assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount()) + + // create service for dev environment + svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "dev") + assert.NoError(t, err) wg := &sync.WaitGroup{} svc.StartQueue(wg, time.Millisecond*100) - // test writing metrics directly via the client - _, err = svc.Client.PutMetricData(context.Background(), svc.Prepare([]types.MetricDatum{ - {MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount}, - {MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20), Unit: types.StandardUnitCount}, - })) - assert.NoError(t, err) + svc.Queue(types.MetricDatum{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount}) + svc.Queue(types.MetricDatum{MetricName: aws.String("NumSheep"), Value: aws.Float64(20), Unit: types.StandardUnitCount}) + assert.Equal(t, 0, svc.Client.(*cwatch.DevClient).CallCount()) // not sent yet + + time.Sleep(time.Millisecond * 200) + + assert.Equal(t, 1, svc.Client.(*cwatch.DevClient).CallCount()) // sent as one call - // test queuing metrics to be sent by batching process svc.Queue(types.MetricDatum{MetricName: aws.String("SleepTime"), Value: aws.Float64(30), Unit: types.StandardUnitSeconds}) svc.StopQueue() @@ -59,4 +47,9 @@ func TestService(t *testing.T) { // check the queued metric was sent assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount()) + + // create service for prod environment + svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "prod") + assert.NoError(t, err) + assert.NotNil(t, svc) }