Skip to content

Commit

Permalink
Add special handling deployment=test in cloudwatch service
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Dec 13, 2024
1 parent dc71b8f commit 4c2c887
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 47 deletions.
39 changes: 24 additions & 15 deletions aws/cwatch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ import (
type Service struct {
Client Client
namespace string
deployment types.Dimension
deployment string
batcher *syncx.Batcher[types.MetricDatum]
}

// NewService creates a new Cloudwatch service with the given credentials and configuration. If deployment is "dev" then
// then instead of a real Cloudwatch client, the service will get a mocked version that just logs metrics.
// NewService creates a new Cloudwatch service with the given credentials and configuration. Some behaviours depend on
// the given deployment value:
// - "test": metrics just logged, Queue(..) sends synchronously
// - "dev": metrics just logged, Queue(..) adds to batcher
// - "*": metrics sent to Cloudwatch, Queue(..) adds to batcher
func NewService(accessKey, secretKey, region, namespace, deployment string) (*Service, error) {
var client Client

if deployment == "dev" {
if deployment == "dev" || deployment == "test" {
client = &DevClient{}
} else {
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
Expand All @@ -35,11 +38,7 @@ func NewService(accessKey, secretKey, region, namespace, deployment string) (*Se
client = cloudwatch.NewFromConfig(cfg)
}

return &Service{
Client: client,
namespace: namespace,
deployment: types.Dimension{Name: aws.String("Deployment"), Value: aws.String(deployment)},
}, nil
return &Service{Client: client, namespace: namespace, deployment: deployment}, nil
}

func (s *Service) StartQueue(wg *sync.WaitGroup, maxAge time.Duration) {
Expand All @@ -57,14 +56,25 @@ func (s *Service) StopQueue() {
s.batcher.Stop()
}

func (s *Service) Queue(d types.MetricDatum) {
s.batcher.Queue(d)
func (s *Service) Queue(data ...types.MetricDatum) {
if s.deployment == "test" {
s.Send(context.TODO(), data...)
} else {
for _, d := range data {
s.batcher.Queue(d)
}
}
}

func (s *Service) Send(ctx context.Context, data ...types.MetricDatum) error {
_, err := s.Client.PutMetricData(ctx, s.prepare(data))
return err
}

func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput {
func (s *Service) prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput {
// add deployment as the first dimension to all metrics
for i := range data {
data[i].Dimensions = append([]types.Dimension{s.deployment}, data[i].Dimensions...)
data[i].Dimensions = append([]types.Dimension{{Name: aws.String("Deployment"), Value: aws.String(s.deployment)}}, data[i].Dimensions...)
}

return &cloudwatch.PutMetricDataInput{
Expand All @@ -74,8 +84,7 @@ func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInp
}

func (s *Service) processBatch(batch []types.MetricDatum) {
_, err := s.Client.PutMetricData(context.TODO(), s.Prepare(batch))
if err != nil {
if err := s.Send(context.TODO(), batch...); err != nil {
slog.Error("error sending metric data batch", "error", err, "count", len(batch))
}
}
57 changes: 25 additions & 32 deletions aws/cwatch/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,56 +7,49 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/stretchr/testify/assert"
)

func TestService(t *testing.T) {
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "dev")
// create service for test environment
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "test")
assert.NoError(t, err)

assert.Equal(t, &cloudwatch.PutMetricDataInput{
Namespace: aws.String("Foo"),
MetricData: []types.MetricDatum{
{
MetricName: aws.String("NumGoats"),
Dimensions: []types.Dimension{
{Name: aws.String("Deployment"), Value: aws.String("dev")},
},
Value: aws.Float64(10),
},
{
MetricName: aws.String("NumSheep"),
Dimensions: []types.Dimension{
{Name: aws.String("Deployment"), Value: aws.String("dev")},
{Name: aws.String("Host"), Value: aws.String("foo1")},
},
Value: aws.Float64(20),
},
},
}, svc.Prepare([]types.MetricDatum{
{MetricName: aws.String("NumGoats"), Value: aws.Float64(10)},
{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)},
}))
err = svc.Send(context.Background(), types.MetricDatum{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)})
assert.NoError(t, err)
assert.Equal(t, 1, svc.Client.(*cwatch.DevClient).CallCount())

// check Queue sends synchronously
svc.Queue(types.MetricDatum{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount})
assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())

// create service for dev environment
svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "dev")
assert.NoError(t, err)

wg := &sync.WaitGroup{}
svc.StartQueue(wg, time.Millisecond*100)

// test writing metrics directly via the client
_, err = svc.Client.PutMetricData(context.Background(), svc.Prepare([]types.MetricDatum{
{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount},
{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20), Unit: types.StandardUnitCount},
}))
assert.NoError(t, err)
svc.Queue(types.MetricDatum{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount})
svc.Queue(types.MetricDatum{MetricName: aws.String("NumSheep"), Value: aws.Float64(20), Unit: types.StandardUnitCount})
assert.Equal(t, 0, svc.Client.(*cwatch.DevClient).CallCount()) // not sent yet

time.Sleep(time.Millisecond * 200)

assert.Equal(t, 1, svc.Client.(*cwatch.DevClient).CallCount()) // sent as one call

// test queuing metrics to be sent by batching process
svc.Queue(types.MetricDatum{MetricName: aws.String("SleepTime"), Value: aws.Float64(30), Unit: types.StandardUnitSeconds})

svc.StopQueue()
wg.Wait()

// check the queued metric was sent
assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())

// create service for prod environment
svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "prod")
assert.NoError(t, err)
assert.NotNil(t, svc)
}

0 comments on commit 4c2c887

Please sign in to comment.