Skip to content

Commit

Permalink
fix race condition with dirty set
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhall07 committed Nov 2, 2021
1 parent 8c07727 commit 30003f1
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 197 deletions.
102 changes: 53 additions & 49 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2270,7 +2270,6 @@ func TestGaugeElemReset(t *testing.T) {
require.Equal(t, 0, len(f.consumedValues))
}


// Consume all values.
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
Expand Down
76 changes: 76 additions & 0 deletions src/aggregator/aggregator/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"testing"
"time"

"github.com/uber-go/tally"

"github.com/m3db/m3/src/aggregator/runtime"
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/metrics/metadata"
Expand Down Expand Up @@ -1071,6 +1073,80 @@ func TestAddUntimed_ResendEnabled(t *testing.T) {
require.True(t, ok)
}

func TestAddUntimed_ClosedAggregation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

e, _, _ := testEntry(ctrl, testEntryOptions{})

metadatas := metadata.StagedMetadatas{
{
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
{
ResendEnabled: false,
StoragePolicies: policy.StoragePolicies{
testStoragePolicy,
},
},
},
},
},
}
resolution := testStoragePolicy.Resolution().Window

// add an aggregation
require.NoError(t, e.addUntimed(testGauge, metadatas))
require.Len(t, e.aggregations, 1)
agg := e.aggregations[0]
require.False(t, agg.resendEnabled)
elem := agg.elem.Value.(*GaugeElem)
vals := elem.values
require.Len(t, vals, 1)
t1 := xtime.ToUnixNano(e.nowFn().Truncate(resolution))
require.NotNil(t, vals[t1].lockedAgg)

// consume the aggregation.
t2 := t1.Add(resolution)
require.False(t, consume(elem, t2))
// the agg is now closed but still exists.
require.True(t, vals[t1].lockedAgg.closed)
require.Empty(t, elem.dirty)

// adding to a closed aggregation rolls forward to the next aggregation.
require.NoError(t, e.addUntimed(testGauge, metadatas))
require.Len(t, e.aggregations, 1)
require.Len(t, vals, 2)
require.NotNil(t, vals[t2].lockedAgg)

// consume next aggregation
t3 := t2.Add(resolution)
require.False(t, consume(elem, t3))
// the second agg is now closed but still exists.
require.True(t, vals[t2].lockedAgg.closed)
// the first agg is GC'd
require.Nil(t, vals[t1].lockedAgg)
// attempting to add to a closed aggregation leaks a dirty entry
require.Len(t, elem.dirty, 1)
require.Equal(t, t1, elem.dirty[0])

// consume again to clear the leaked dirty entry
require.False(t, consume(elem, t3.Add(resolution)))
require.True(t, vals[t2].lockedAgg.closed)
require.Empty(t, elem.dirty)
}

func consume(elem metricElem, targetTime xtime.UnixNano) bool {
l := &baseMetricList{
metrics: newMetricListMetrics(tally.NewTestScope("", nil)),
}
return elem.Consume(
int64(targetTime),
isStandardMetricEarlierThan, standardMetricTimestampNanos, standardMetricTargetNanos,
l.discardLocalMetric, l.discardForwardedMetric, l.onForwardingElemDiscarded,
0, discardType)
}

func TestShouldUpdateStagedMetadataWithLock(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
Loading

0 comments on commit 30003f1

Please sign in to comment.