Skip to content

Commit

Permalink
CloudwatchService.Stop should wait for batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Dec 16, 2024
1 parent fdea1bc commit 2b413a2
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
11 changes: 8 additions & 3 deletions aws/cwatch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ type Service struct {
Client Client
namespace string
deployment string
batcher *syncx.Batcher[types.MetricDatum]

batcher *syncx.Batcher[types.MetricDatum]
batcherWG *sync.WaitGroup
}

// NewService creates a new Cloudwatch service with the given credentials and configuration. Some behaviours depend on
Expand All @@ -41,11 +43,13 @@ func NewService(accessKey, secretKey, region, namespace, deployment string) (*Se
return &Service{Client: client, namespace: namespace, deployment: deployment}, nil
}

func (s *Service) StartQueue(wg *sync.WaitGroup, maxAge time.Duration) {
func (s *Service) StartQueue(maxAge time.Duration) {
if s.batcher != nil {
panic("queue already started")
}
s.batcher = syncx.NewBatcher(s.processBatch, 100, maxAge, 1000, wg)

s.batcherWG = &sync.WaitGroup{}
s.batcher = syncx.NewBatcher(s.processBatch, 100, maxAge, 1000, s.batcherWG)
s.batcher.Start()
}

Expand All @@ -54,6 +58,7 @@ func (s *Service) StopQueue() {
panic("queue wasn't started")
}
s.batcher.Stop()
s.batcherWG.Wait()
}

func (s *Service) Queue(data ...types.MetricDatum) {
Expand Down
5 changes: 1 addition & 4 deletions aws/cwatch/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cwatch_test

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -29,8 +28,7 @@ func TestService(t *testing.T) {
svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "dev")
assert.NoError(t, err)

wg := &sync.WaitGroup{}
svc.StartQueue(wg, time.Millisecond*100)
svc.StartQueue(time.Millisecond * 100)

svc.Queue(cwatch.Datum("NumGoats", 10, types.StandardUnitCount, cwatch.Dimension("Host", "foo1")))
svc.Queue(cwatch.Datum("NumSheep", 20, types.StandardUnitCount))
Expand All @@ -43,7 +41,6 @@ func TestService(t *testing.T) {
svc.Queue(cwatch.Datum("SleepTime", 30, types.StandardUnitSeconds))

svc.StopQueue()
wg.Wait()

// check the queued metric was sent
assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())
Expand Down

0 comments on commit 2b413a2

Please sign in to comment.