Skip to content

Commit

Permalink
Merge pull request #147 from nyaruka/cwatch_batcher_optional
Browse files Browse the repository at this point in the history
Make batch queue processing optional on cloudwatch service
  • Loading branch information
rowanseymour authored Dec 12, 2024
2 parents 4b55d8c + df2f8d4 commit e4a12eb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
20 changes: 12 additions & 8 deletions aws/cwatch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,31 @@ type Service struct {
}

// NewService creates a new Cloudwatch service with the given credentials and configuration
func NewService(accessKey, secretKey, region, namespace, deployment string, wg *sync.WaitGroup) (*Service, error) {
func NewService(accessKey, secretKey, region, namespace, deployment string) (*Service, error) {
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
if err != nil {
return nil, err
}

s := &Service{
return &Service{
Client: cloudwatch.NewFromConfig(cfg),
namespace: namespace,
deployment: types.Dimension{Name: aws.String("Deployment"), Value: aws.String(deployment)},
}
s.batcher = syncx.NewBatcher(s.processBatch, 100, time.Second*3, 1000, wg)

return s, nil
}, nil
}

func (s *Service) Start() {
func (s *Service) StartQueue(wg *sync.WaitGroup) {
if s.batcher != nil {
panic("queue already started")
}
s.batcher = syncx.NewBatcher(s.processBatch, 100, time.Second*3, 1000, wg)
s.batcher.Start()
}

func (s *Service) Stop() {
func (s *Service) StopQueue() {
if s.batcher == nil {
panic("queue wasn't started")
}
s.batcher.Stop()
}

Expand Down
11 changes: 4 additions & 7 deletions aws/cwatch/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import (
)

func TestService(t *testing.T) {
wg := &sync.WaitGroup{}

svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "testing", wg)
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "testing")
assert.NoError(t, err)

assert.Equal(t, &cloudwatch.PutMetricDataInput{
Expand All @@ -41,9 +39,8 @@ func TestService(t *testing.T) {
{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)},
}))

svc.Start()

svc.Stop()

wg := &sync.WaitGroup{}
svc.StartQueue(wg)
svc.StopQueue()
wg.Wait()
}

0 comments on commit e4a12eb

Please sign in to comment.