Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky aggregator shutdown test. #619

Merged
merged 2 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion plugins/outputs/cloudwatch/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,16 @@ func newDurationAggregator(durationInSeconds time.Duration,

func (durationAgg *durationAggregator) aggregating() {
durationAgg.wg.Add(1)
// sleep for some time until next round duration from now.
// Sleep to align the interval to the wall clock.
// This initial sleep is not interrupted if the aggregator gets shutdown.
now := time.Now()
time.Sleep(now.Truncate(durationAgg.aggregationDuration).Add(durationAgg.aggregationDuration).Sub(now))
durationAgg.ticker = time.NewTicker(durationAgg.aggregationDuration)
defer durationAgg.ticker.Stop()
for {
// There is no priority to select{}.
// If there is a new metric AND the shutdownChan is closed when this
// loop begins, then the behavior is random.
select {
case m := <-durationAgg.aggregationChan:
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
Expand Down
27 changes: 20 additions & 7 deletions plugins/outputs/cloudwatch/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,32 @@ func TestAggregator_ShutdownBehavior(t *testing.T) {
// verify the remaining metrics can be read after shutdown
// the metrics should be available immediately after the shutdown even before aggregation period
aggregationInterval := 2 * time.Second
tags := map[string]string{"d1key": "d1value", "d2key": "d2value", aggregationIntervalTagKey: aggregationInterval.String()}
tags := map[string]string{
"d1key": "d1value",
"d2key": "d2value",
aggregationIntervalTagKey: aggregationInterval.String()}
fields := map[string]interface{}{"value": 1}
timestamp := time.Now()
m := metric.New(metricName, tags, fields, timestamp)
aggregator.AddMetric(m)

//give some time to aggregation to do the work
time.Sleep(time.Second * 2)

// The Aggregator creates a new durationAggregator for each metric.
// And there is a delay when each new durationAggregator begins.
// So submit a metric and wait for the first aggregation to occur.
time.Sleep(aggregationInterval + time.Second)
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
assertMetricContent(t, metricChan, 1*time.Second, m, expectedFieldContent{
"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}})
assertNoMetricsInChan(t, metricChan)
// Now submit the same metric and it should be routed to the existing
// durationAggregator without delay.
timestamp = time.Now()
m = metric.New(metricName, tags, fields, timestamp)
aggregator.AddMetric(m)
// Shutdown before the aggregationInterval
time.Sleep(aggregationInterval / 2)
close(shutdownChan)
wg.Wait()

assertMetricContent(t, metricChan, 1*time.Second, m, expectedFieldContent{"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}})
assertMetricContent(t, metricChan, 1*time.Second, m, expectedFieldContent{
"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}})
assertNoMetricsInChan(t, metricChan)
}

Expand Down