Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add special handling deployment=test in cloudwatch service #150

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions aws/cwatch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ import (
type Service struct {
Client Client
namespace string
deployment types.Dimension
deployment string
batcher *syncx.Batcher[types.MetricDatum]
}

// NewService creates a new Cloudwatch service with the given credentials and configuration. If deployment is "dev" then
// then instead of a real Cloudwatch client, the service will get a mocked version that just logs metrics.
// NewService creates a new Cloudwatch service with the given credentials and configuration. Some behaviours depend on
// the given deployment value:
// - "test": metrics just logged, Queue(..) sends synchronously
// - "dev": metrics just logged, Queue(..) adds to batcher
// - "*": metrics sent to Cloudwatch, Queue(..) adds to batcher
func NewService(accessKey, secretKey, region, namespace, deployment string) (*Service, error) {
var client Client

if deployment == "dev" {
if deployment == "dev" || deployment == "test" {
client = &DevClient{}
} else {
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
Expand All @@ -35,11 +38,7 @@ func NewService(accessKey, secretKey, region, namespace, deployment string) (*Se
client = cloudwatch.NewFromConfig(cfg)
}

return &Service{
Client: client,
namespace: namespace,
deployment: types.Dimension{Name: aws.String("Deployment"), Value: aws.String(deployment)},
}, nil
return &Service{Client: client, namespace: namespace, deployment: deployment}, nil
}

func (s *Service) StartQueue(wg *sync.WaitGroup, maxAge time.Duration) {
Expand All @@ -57,14 +56,25 @@ func (s *Service) StopQueue() {
s.batcher.Stop()
}

func (s *Service) Queue(d types.MetricDatum) {
s.batcher.Queue(d)
func (s *Service) Queue(data ...types.MetricDatum) {
if s.deployment == "test" {
s.Send(context.TODO(), data...)
} else {
for _, d := range data {
s.batcher.Queue(d)
}
}
}

func (s *Service) Send(ctx context.Context, data ...types.MetricDatum) error {
_, err := s.Client.PutMetricData(ctx, s.prepare(data))
return err
}

func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput {
func (s *Service) prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput {
// add deployment as the first dimension to all metrics
for i := range data {
data[i].Dimensions = append([]types.Dimension{s.deployment}, data[i].Dimensions...)
data[i].Dimensions = append([]types.Dimension{{Name: aws.String("Deployment"), Value: aws.String(s.deployment)}}, data[i].Dimensions...)
}

return &cloudwatch.PutMetricDataInput{
Expand All @@ -74,8 +84,7 @@ func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInp
}

func (s *Service) processBatch(batch []types.MetricDatum) {
_, err := s.Client.PutMetricData(context.TODO(), s.Prepare(batch))
if err != nil {
if err := s.Send(context.TODO(), batch...); err != nil {
slog.Error("error sending metric data batch", "error", err, "count", len(batch))
}
}
57 changes: 25 additions & 32 deletions aws/cwatch/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,56 +7,49 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/stretchr/testify/assert"
)

func TestService(t *testing.T) {
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "dev")
// create service for test environment
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "test")
assert.NoError(t, err)

assert.Equal(t, &cloudwatch.PutMetricDataInput{
Namespace: aws.String("Foo"),
MetricData: []types.MetricDatum{
{
MetricName: aws.String("NumGoats"),
Dimensions: []types.Dimension{
{Name: aws.String("Deployment"), Value: aws.String("dev")},
},
Value: aws.Float64(10),
},
{
MetricName: aws.String("NumSheep"),
Dimensions: []types.Dimension{
{Name: aws.String("Deployment"), Value: aws.String("dev")},
{Name: aws.String("Host"), Value: aws.String("foo1")},
},
Value: aws.Float64(20),
},
},
}, svc.Prepare([]types.MetricDatum{
{MetricName: aws.String("NumGoats"), Value: aws.Float64(10)},
{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)},
}))
err = svc.Send(context.Background(), types.MetricDatum{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)})
assert.NoError(t, err)
assert.Equal(t, 1, svc.Client.(*cwatch.DevClient).CallCount())

// check Queue sends synchronously
svc.Queue(types.MetricDatum{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount})
assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())

// create service for dev environment
svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "dev")
assert.NoError(t, err)

wg := &sync.WaitGroup{}
svc.StartQueue(wg, time.Millisecond*100)

// test writing metrics directly via the client
_, err = svc.Client.PutMetricData(context.Background(), svc.Prepare([]types.MetricDatum{
{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount},
{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20), Unit: types.StandardUnitCount},
}))
assert.NoError(t, err)
svc.Queue(types.MetricDatum{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount})
svc.Queue(types.MetricDatum{MetricName: aws.String("NumSheep"), Value: aws.Float64(20), Unit: types.StandardUnitCount})
assert.Equal(t, 0, svc.Client.(*cwatch.DevClient).CallCount()) // not sent yet

time.Sleep(time.Millisecond * 200)

assert.Equal(t, 1, svc.Client.(*cwatch.DevClient).CallCount()) // sent as one call

// test queuing metrics to be sent by batching process
svc.Queue(types.MetricDatum{MetricName: aws.String("SleepTime"), Value: aws.Float64(30), Unit: types.StandardUnitSeconds})

svc.StopQueue()
wg.Wait()

// check the queued metric was sent
assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())

// create service for prod environment
svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "prod")
assert.NoError(t, err)
assert.NotNil(t, svc)
}
Loading