Skip to content

Commit

Permalink
Merge pull request #149 from nyaruka/cloudwatch_dev
Browse files Browse the repository at this point in the history
For dev deployments, cloudwatch service should just log to console
  • Loading branch information
rowanseymour authored Dec 13, 2024
2 parents c430b1e + bbb3bdc commit d7edd1a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 13 deletions.
41 changes: 41 additions & 0 deletions aws/cwatch/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cwatch

import (
"context"
"log/slog"
"strings"
"sync/atomic"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/smithy-go/middleware"
)

// Client is the interface for a Cloudwatch client that can only send metrics
type Client interface {
PutMetricData(ctx context.Context, params *cloudwatch.PutMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.PutMetricDataOutput, error)
}

type DevClient struct {
callCount atomic.Int32
}

func (c *DevClient) PutMetricData(ctx context.Context, params *cloudwatch.PutMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.PutMetricDataOutput, error) {
// log each metric being "sent"
for _, md := range params.MetricData {
log := slog.With("namespace", aws.ToString(params.Namespace))

for _, dim := range md.Dimensions {
log = log.With(strings.ToLower(aws.ToString(dim.Name)), aws.ToString(dim.Value))
}
log.With("metric", aws.ToString(md.MetricName), "value", aws.ToFloat64(md.Value), "unit", md.Unit).Info("put metric data")
}

c.callCount.Add(1)

return &cloudwatch.PutMetricDataOutput{ResultMetadata: middleware.Metadata{}}, nil
}

func (c *DevClient) CallCount() int {
return int(c.callCount.Load())
}
26 changes: 17 additions & 9 deletions aws/cwatch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,39 @@ import (
)

type Service struct {
Client *cloudwatch.Client
Client Client
namespace string
deployment types.Dimension
batcher *syncx.Batcher[types.MetricDatum]
}

// NewService creates a new Cloudwatch service with the given credentials and configuration
// NewService creates a new Cloudwatch service with the given credentials and configuration. If deployment is "dev" then
// then instead of a real Cloudwatch client, the service will get a mocked version that just logs metrics.
func NewService(accessKey, secretKey, region, namespace, deployment string) (*Service, error) {
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
if err != nil {
return nil, err
var client Client

if deployment == "dev" {
client = &DevClient{}
} else {
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
if err != nil {
return nil, err
}
client = cloudwatch.NewFromConfig(cfg)
}

return &Service{
Client: cloudwatch.NewFromConfig(cfg),
Client: client,
namespace: namespace,
deployment: types.Dimension{Name: aws.String("Deployment"), Value: aws.String(deployment)},
}, nil
}

func (s *Service) StartQueue(wg *sync.WaitGroup) {
func (s *Service) StartQueue(wg *sync.WaitGroup, maxAge time.Duration) {
if s.batcher != nil {
panic("queue already started")
}
s.batcher = syncx.NewBatcher(s.processBatch, 100, time.Second*3, 1000, wg)
s.batcher = syncx.NewBatcher(s.processBatch, 100, maxAge, 1000, wg)
s.batcher.Start()
}

Expand Down Expand Up @@ -68,6 +76,6 @@ func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInp
func (s *Service) processBatch(batch []types.MetricDatum) {
_, err := s.Client.PutMetricData(context.TODO(), s.Prepare(batch))
if err != nil {
slog.Error("error sending metrics to cloudwatch", "error", err, "count", len(batch))
slog.Error("error sending metric data batch", "error", err, "count", len(batch))
}
}
24 changes: 20 additions & 4 deletions aws/cwatch/service_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cwatch_test

import (
"context"
"sync"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
Expand All @@ -12,7 +14,7 @@ import (
)

func TestService(t *testing.T) {
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "testing")
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "dev")
assert.NoError(t, err)

assert.Equal(t, &cloudwatch.PutMetricDataInput{
Expand All @@ -21,14 +23,14 @@ func TestService(t *testing.T) {
{
MetricName: aws.String("NumGoats"),
Dimensions: []types.Dimension{
{Name: aws.String("Deployment"), Value: aws.String("testing")},
{Name: aws.String("Deployment"), Value: aws.String("dev")},
},
Value: aws.Float64(10),
},
{
MetricName: aws.String("NumSheep"),
Dimensions: []types.Dimension{
{Name: aws.String("Deployment"), Value: aws.String("testing")},
{Name: aws.String("Deployment"), Value: aws.String("dev")},
{Name: aws.String("Host"), Value: aws.String("foo1")},
},
Value: aws.Float64(20),
Expand All @@ -40,7 +42,21 @@ func TestService(t *testing.T) {
}))

wg := &sync.WaitGroup{}
svc.StartQueue(wg)
svc.StartQueue(wg, time.Millisecond*100)

// test writing metrics directly via the client
_, err = svc.Client.PutMetricData(context.Background(), svc.Prepare([]types.MetricDatum{
{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount},
{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20), Unit: types.StandardUnitCount},
}))
assert.NoError(t, err)

// test queuing metrics to be sent by batching process
svc.Queue(types.MetricDatum{MetricName: aws.String("SleepTime"), Value: aws.Float64(30), Unit: types.StandardUnitSeconds})

svc.StopQueue()
wg.Wait()

// check the queued metric was sent
assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())
}

0 comments on commit d7edd1a

Please sign in to comment.