From 2b413a243e04e0773ecef9d173c1b1104031e7ca Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 16 Dec 2024 17:34:33 -0500 Subject: [PATCH] CloudwatchService.Stop should wait for batcher --- aws/cwatch/service.go | 11 ++++++++--- aws/cwatch/service_test.go | 5 +---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/aws/cwatch/service.go b/aws/cwatch/service.go index 3d318fd..dadefb3 100644 --- a/aws/cwatch/service.go +++ b/aws/cwatch/service.go @@ -17,7 +17,9 @@ type Service struct { Client Client namespace string deployment string - batcher *syncx.Batcher[types.MetricDatum] + + batcher *syncx.Batcher[types.MetricDatum] + batcherWG *sync.WaitGroup } // NewService creates a new Cloudwatch service with the given credentials and configuration. Some behaviours depend on @@ -41,11 +43,13 @@ func NewService(accessKey, secretKey, region, namespace, deployment string) (*Se return &Service{Client: client, namespace: namespace, deployment: deployment}, nil } -func (s *Service) StartQueue(wg *sync.WaitGroup, maxAge time.Duration) { +func (s *Service) StartQueue(maxAge time.Duration) { if s.batcher != nil { panic("queue already started") } - s.batcher = syncx.NewBatcher(s.processBatch, 100, maxAge, 1000, wg) + + s.batcherWG = &sync.WaitGroup{} + s.batcher = syncx.NewBatcher(s.processBatch, 100, maxAge, 1000, s.batcherWG) s.batcher.Start() } @@ -54,6 +58,7 @@ func (s *Service) StopQueue() { panic("queue wasn't started") } s.batcher.Stop() + s.batcherWG.Wait() } func (s *Service) Queue(data ...types.MetricDatum) { diff --git a/aws/cwatch/service_test.go b/aws/cwatch/service_test.go index 291875d..e925232 100644 --- a/aws/cwatch/service_test.go +++ b/aws/cwatch/service_test.go @@ -2,7 +2,6 @@ package cwatch_test import ( "context" - "sync" "testing" "time" @@ -29,8 +28,7 @@ func TestService(t *testing.T) { svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "dev") assert.NoError(t, err) - wg := &sync.WaitGroup{} - svc.StartQueue(wg, time.Millisecond*100) + svc.StartQueue(time.Millisecond * 100) svc.Queue(cwatch.Datum("NumGoats", 10, types.StandardUnitCount, cwatch.Dimension("Host", "foo1"))) svc.Queue(cwatch.Datum("NumSheep", 20, types.StandardUnitCount)) @@ -43,7 +41,6 @@ func TestService(t *testing.T) { svc.Queue(cwatch.Datum("SleepTime", 30, types.StandardUnitSeconds)) svc.StopQueue() - wg.Wait() // check the queued metric was sent assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())