From 30003f1194d63e46f6c20856999cd358de465768 Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Tue, 2 Nov 2021 08:29:36 -0700 Subject: [PATCH] fix race condition with dirty set --- src/aggregator/aggregator/counter_elem_gen.go | 102 +++++++++--------- src/aggregator/aggregator/elem_test.go | 1 - src/aggregator/aggregator/entry_test.go | 76 +++++++++++++ src/aggregator/aggregator/gauge_elem_gen.go | 102 +++++++++--------- src/aggregator/aggregator/generic_elem.go | 102 +++++++++--------- src/aggregator/aggregator/timer_elem_gen.go | 102 +++++++++--------- 6 files changed, 288 insertions(+), 197 deletions(-) diff --git a/src/aggregator/aggregator/counter_elem_gen.go b/src/aggregator/aggregator/counter_elem_gen.go index 178d8100ca..82bfe169b3 100644 --- a/src/aggregator/aggregator/counter_elem_gen.go +++ b/src/aggregator/aggregator/counter_elem_gen.go @@ -38,7 +38,6 @@ import ( "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" - "github.com/scaleway/scaleway-sdk-go/logger" "github.com/willf/bitset" "go.uber.org/zap" ) @@ -122,9 +121,11 @@ func (e *CounterElem) ResetSetData(data ElemData) error { // AddUnion adds a metric value union at a given timestamp. func (e *CounterElem) AddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool) error { - return e.doAddUnion(timestamp, mu, resendEnabled, false, timestamp) + return e.doAddUnion(timestamp, mu, resendEnabled, false) } -func (e *CounterElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool, retry bool, initialTimestamp time.Time) error { + +func (e *CounterElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool, retry bool, +) error { alignedStart := timestamp.Truncate(e.sp.Resolution().Window) lockedAgg, err := e.findOrCreate(alignedStart.UnixNano(), createAggregationOptions{ resendEnabled: resendEnabled, @@ -134,14 +135,14 @@ func (e *CounterElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnio } lockedAgg.Lock() if lockedAgg.closed { + // Note: this might have created an entry in the dirty set for lockedAgg when calling findOrCreate, even though + // it's already closed. The Consume loop will detect this and clean it up. lockedAgg.Unlock() if !resendEnabled && !retry { - return e.doAddUnion(alignedStart.Add(e.sp.Resolution().Window), mu, false, true, alignedStart) + // handle the edge case where the aggregation was already flushed/closed because the current time is right + // at the boundary. just roll the untimed metric into the next aggregation. + return e.doAddUnion(alignedStart.Add(e.sp.Resolution().Window), mu, false, true) } - logger.Errorf("aggregation already closed", - zap.Time("timestamp", timestamp), - zap.Time("initialTimestamp", initialTimestamp), - zap.Bool("retry", retry)) return errAggregationClosed } lockedAgg.aggregation.AddUnion(timestamp, mu) @@ -230,10 +231,8 @@ func (e *CounterElem) expireValuesWithLock( } resolution := e.sp.Resolution().Window - // start after the minimum to ensure we always keep at least one value in the map for binary transformations. - currStart := e.minStartTime.Add(resolution) + currStart := e.minStartTime resendExpire := targetNanos - int64(e.bufferForPastTimedMetricFn(resolution)) - for isEarlierThanFn(int64(currStart), resolution, targetNanos) { if currV, ok := e.values[currStart]; ok { if currV.resendEnabled { @@ -257,7 +256,9 @@ func (e *CounterElem) expireValuesWithLock( // if this current value is closed and clean it will no longer be flushed. this means it's safe // to remove the previous value since it will no longer be needed for binary transformations. when the // next value is eligible to be expired, this current value will actually be removed. - if prevV, ok := e.values[e.minStartTime]; ok { + // if we're currently pointing at the start skip this there is no previous for the start. this ensures + // we always keep at least one value in the map for binary transformations. + if prevV, ok := e.values[e.minStartTime]; ok && currStart != e.minStartTime { // can't expire flush state until after the flushing, so we save the time to expire later. e.flushStateToExpire = append(e.flushStateToExpire, e.minStartTime) delete(e.values, e.minStartTime) @@ -357,8 +358,26 @@ func (e *CounterElem) Consume( dirtyTimes := e.dirty e.dirty = e.dirty[:0] for i, dirtyTime := range dirtyTimes { - flushState, ready := e.updateFlushStateWithLock(dirtyTime, targetNanos, isEarlierThanFn) - if !ready || !flushState.dirty { + if !isEarlierThanFn(int64(dirtyTime), resolution, targetNanos) { + // not ready yet + e.dirty = append(e.dirty, dirtyTime) + continue + } + agg, ok := e.values[dirtyTime] + if !ok { + // there is a race where a writer adds a closed aggregation to the dirty set. eventually the closed + // aggregation is expired and removed from the values map. ok to skip. + continue + } + + flushState := e.newFlushStateWithLock(agg) + if flushState.dirty && flushState.flushed && !flushState.resendEnabled { + instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { + l.Error("reflushing aggregation without resendEnabled", zap.Any("flushState", flushState)) + }) + } + + if !flushState.dirty { // there is a race where the value was added to the dirty set, but the writer didn't actually update the // value yet (by marking dirty). add back to the dirty set so it can be processed in the next round once // the value has been updated. @@ -370,13 +389,16 @@ func (e *CounterElem) Consume( e.values[dirtyTime] = val e.toConsume = append(e.toConsume, flushState) - // potentially consume the nextAgg as well in case we need to cascade an update due to a resend. - nextAgg, ok := e.nextAggWithLock(dirtyTime, targetNanos, isEarlierThanFn) - if ok && (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { - // if not already in the dirty set. - flushState, ready := e.updateFlushStateWithLock(nextAgg.startAtNanos, targetNanos, isEarlierThanFn) - if ready { - e.toConsume = append(e.toConsume, flushState) + // potentially consume the nextAgg as well in case we need to cascade an update from a previously flushed + // value. + if flushState.flushed { + nextAgg, ok := e.nextAggWithLock(dirtyTime, targetNanos, isEarlierThanFn) + // add if not already in the dirty set. + if ok && + isEarlierThanFn(int64(nextAgg.startAtNanos), resolution, targetNanos) && + // at the end of the dirty times OR the next dirty time does not match. + (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { + e.toConsume = append(e.toConsume, e.newFlushStateWithLock(nextAgg)) } } } @@ -412,23 +434,9 @@ func (e *CounterElem) Consume( return canCollect } -func (e *CounterElem) updateFlushStateWithLock(dirtyTime xtime.UnixNano, targetNanos int64, - isEarlierThanFn isEarlierThanFn) (aggFlushState, bool) { - resolution := e.sp.Resolution().Window - agg, ok := e.values[dirtyTime] - if !ok { - ts := dirtyTime.ToTime() - instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { - l.Error("dirty timestamp not in values map", zap.Time("ts", ts)) - }) - return aggFlushState{}, false - } - if !isEarlierThanFn(int64(dirtyTime), resolution, targetNanos) { - return aggFlushState{}, false - } - +func (e *CounterElem) newFlushStateWithLock(agg timedCounter) aggFlushState { // note: flushState might be empty for the first flush - flushState := e.flushState[dirtyTime] + flushState := e.flushState[agg.startAtNanos] // copy the lockedAgg data to the flushState while holding the lock. agg.lockedAgg.Lock() flushState.dirty = agg.lockedAgg.dirty @@ -442,15 +450,15 @@ func (e *CounterElem) updateFlushStateWithLock(dirtyTime xtime.UnixNano, targetN agg.lockedAgg.Unlock() // update the flushState with everything else. - previousStartAligned, ok := e.previousStartAlignedWithLock(dirtyTime) + previousStartAligned, ok := e.previousStartAlignedWithLock(agg.startAtNanos) if ok { flushState.prevStartTime = previousStartAligned } else { flushState.prevStartTime = 0 } flushState.resendEnabled = agg.resendEnabled - flushState.startAt = dirtyTime - return flushState, true + flushState.startAt = agg.startAtNanos + return flushState } // Close closes the element. @@ -674,11 +682,9 @@ func (e *CounterElem) processValue( l.Error("previous start time not in state map", zap.Time("ts", ts)) }) - } else if prevFlushState.consumedValues != nil { - // prev consumedValues may be null if the result was NaN. - prev.Value = prevFlushState.consumedValues[aggTypeIdx] - prev.TimeNanos = int64(prevFlushState.timestamp) } + prev.Value = prevFlushState.consumedValues[aggTypeIdx] + prev.TimeNanos = int64(prevFlushState.timestamp) } curr := transformation.Datapoint{ TimeNanos: int64(flushState.timestamp), @@ -690,12 +696,10 @@ func (e *CounterElem) processValue( // We currently only support first-order derivative transformations so we only // need to keep one value. In the future if we need to support higher-order // derivative transformations, we need to store an array of values here. - if !math.IsNaN(curr.Value) { - if flushState.consumedValues == nil { - flushState.consumedValues = make([]float64, len(e.aggTypes)) - } - flushState.consumedValues[aggTypeIdx] = curr.Value + if flushState.consumedValues == nil { + flushState.consumedValues = make([]float64, len(e.aggTypes)) } + flushState.consumedValues[aggTypeIdx] = curr.Value value = res.Value case isUnaryMultiOp: curr := transformation.Datapoint{ diff --git a/src/aggregator/aggregator/elem_test.go b/src/aggregator/aggregator/elem_test.go index 522f078d6f..39e45b7cee 100644 --- a/src/aggregator/aggregator/elem_test.go +++ b/src/aggregator/aggregator/elem_test.go @@ -2270,7 +2270,6 @@ func TestGaugeElemReset(t *testing.T) { require.Equal(t, 0, len(f.consumedValues)) } - // Consume all values. localFn, localRes = testFlushLocalMetricFn() forwardFn, forwardRes = testFlushForwardedMetricFn() diff --git a/src/aggregator/aggregator/entry_test.go b/src/aggregator/aggregator/entry_test.go index 42d3b4a53f..ba28bb1ad4 100644 --- a/src/aggregator/aggregator/entry_test.go +++ b/src/aggregator/aggregator/entry_test.go @@ -28,6 +28,8 @@ import ( "testing" "time" + "github.com/uber-go/tally" + "github.com/m3db/m3/src/aggregator/runtime" "github.com/m3db/m3/src/metrics/aggregation" "github.com/m3db/m3/src/metrics/metadata" @@ -1071,6 +1073,80 @@ func TestAddUntimed_ResendEnabled(t *testing.T) { require.True(t, ok) } +func TestAddUntimed_ClosedAggregation(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + e, _, _ := testEntry(ctrl, testEntryOptions{}) + + metadatas := metadata.StagedMetadatas{ + { + Metadata: metadata.Metadata{ + Pipelines: []metadata.PipelineMetadata{ + { + ResendEnabled: false, + StoragePolicies: policy.StoragePolicies{ + testStoragePolicy, + }, + }, + }, + }, + }, + } + resolution := testStoragePolicy.Resolution().Window + + // add an aggregation + require.NoError(t, e.addUntimed(testGauge, metadatas)) + require.Len(t, e.aggregations, 1) + agg := e.aggregations[0] + require.False(t, agg.resendEnabled) + elem := agg.elem.Value.(*GaugeElem) + vals := elem.values + require.Len(t, vals, 1) + t1 := xtime.ToUnixNano(e.nowFn().Truncate(resolution)) + require.NotNil(t, vals[t1].lockedAgg) + + // consume the aggregation. + t2 := t1.Add(resolution) + require.False(t, consume(elem, t2)) + // the agg is now closed but still exists. + require.True(t, vals[t1].lockedAgg.closed) + require.Empty(t, elem.dirty) + + // adding to a closed aggregation rolls forward to the next aggregation. + require.NoError(t, e.addUntimed(testGauge, metadatas)) + require.Len(t, e.aggregations, 1) + require.Len(t, vals, 2) + require.NotNil(t, vals[t2].lockedAgg) + + // consume next aggregation + t3 := t2.Add(resolution) + require.False(t, consume(elem, t3)) + // the second agg is now closed but still exists. + require.True(t, vals[t2].lockedAgg.closed) + // the first agg is GC'd + require.Nil(t, vals[t1].lockedAgg) + // attempting to add to a closed aggregation leaks a dirty entry + require.Len(t, elem.dirty, 1) + require.Equal(t, t1, elem.dirty[0]) + + // consume again to clear the leaked dirty entry + require.False(t, consume(elem, t3.Add(resolution))) + require.True(t, vals[t2].lockedAgg.closed) + require.Empty(t, elem.dirty) +} + +func consume(elem metricElem, targetTime xtime.UnixNano) bool { + l := &baseMetricList{ + metrics: newMetricListMetrics(tally.NewTestScope("", nil)), + } + return elem.Consume( + int64(targetTime), + isStandardMetricEarlierThan, standardMetricTimestampNanos, standardMetricTargetNanos, + l.discardLocalMetric, l.discardForwardedMetric, l.onForwardingElemDiscarded, + 0, discardType) +} + func TestShouldUpdateStagedMetadataWithLock(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index 14d183c188..794b89080e 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -38,7 +38,6 @@ import ( "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" - "github.com/scaleway/scaleway-sdk-go/logger" "github.com/willf/bitset" "go.uber.org/zap" ) @@ -122,9 +121,11 @@ func (e *GaugeElem) ResetSetData(data ElemData) error { // AddUnion adds a metric value union at a given timestamp. func (e *GaugeElem) AddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool) error { - return e.doAddUnion(timestamp, mu, resendEnabled, false, timestamp) + return e.doAddUnion(timestamp, mu, resendEnabled, false) } -func (e *GaugeElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool, retry bool, initialTimestamp time.Time) error { + +func (e *GaugeElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool, retry bool, +) error { alignedStart := timestamp.Truncate(e.sp.Resolution().Window) lockedAgg, err := e.findOrCreate(alignedStart.UnixNano(), createAggregationOptions{ resendEnabled: resendEnabled, @@ -134,14 +135,14 @@ func (e *GaugeElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, } lockedAgg.Lock() if lockedAgg.closed { + // Note: this might have created an entry in the dirty set for lockedAgg when calling findOrCreate, even though + // it's already closed. The Consume loop will detect this and clean it up. lockedAgg.Unlock() if !resendEnabled && !retry { - return e.doAddUnion(alignedStart.Add(e.sp.Resolution().Window), mu, false, true, alignedStart) + // handle the edge case where the aggregation was already flushed/closed because the current time is right + // at the boundary. just roll the untimed metric into the next aggregation. + return e.doAddUnion(alignedStart.Add(e.sp.Resolution().Window), mu, false, true) } - logger.Errorf("aggregation already closed", - zap.Time("timestamp", timestamp), - zap.Time("initialTimestamp", initialTimestamp), - zap.Bool("retry", retry)) return errAggregationClosed } lockedAgg.aggregation.AddUnion(timestamp, mu) @@ -230,10 +231,8 @@ func (e *GaugeElem) expireValuesWithLock( } resolution := e.sp.Resolution().Window - // start after the minimum to ensure we always keep at least one value in the map for binary transformations. - currStart := e.minStartTime.Add(resolution) + currStart := e.minStartTime resendExpire := targetNanos - int64(e.bufferForPastTimedMetricFn(resolution)) - for isEarlierThanFn(int64(currStart), resolution, targetNanos) { if currV, ok := e.values[currStart]; ok { if currV.resendEnabled { @@ -257,7 +256,9 @@ func (e *GaugeElem) expireValuesWithLock( // if this current value is closed and clean it will no longer be flushed. this means it's safe // to remove the previous value since it will no longer be needed for binary transformations. when the // next value is eligible to be expired, this current value will actually be removed. - if prevV, ok := e.values[e.minStartTime]; ok { + // if we're currently pointing at the start skip this there is no previous for the start. this ensures + // we always keep at least one value in the map for binary transformations. + if prevV, ok := e.values[e.minStartTime]; ok && currStart != e.minStartTime { // can't expire flush state until after the flushing, so we save the time to expire later. e.flushStateToExpire = append(e.flushStateToExpire, e.minStartTime) delete(e.values, e.minStartTime) @@ -357,8 +358,26 @@ func (e *GaugeElem) Consume( dirtyTimes := e.dirty e.dirty = e.dirty[:0] for i, dirtyTime := range dirtyTimes { - flushState, ready := e.updateFlushStateWithLock(dirtyTime, targetNanos, isEarlierThanFn) - if !ready || !flushState.dirty { + if !isEarlierThanFn(int64(dirtyTime), resolution, targetNanos) { + // not ready yet + e.dirty = append(e.dirty, dirtyTime) + continue + } + agg, ok := e.values[dirtyTime] + if !ok { + // there is a race where a writer adds a closed aggregation to the dirty set. eventually the closed + // aggregation is expired and removed from the values map. ok to skip. + continue + } + + flushState := e.newFlushStateWithLock(agg) + if flushState.dirty && flushState.flushed && !flushState.resendEnabled { + instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { + l.Error("reflushing aggregation without resendEnabled", zap.Any("flushState", flushState)) + }) + } + + if !flushState.dirty { // there is a race where the value was added to the dirty set, but the writer didn't actually update the // value yet (by marking dirty). add back to the dirty set so it can be processed in the next round once // the value has been updated. @@ -370,13 +389,16 @@ func (e *GaugeElem) Consume( e.values[dirtyTime] = val e.toConsume = append(e.toConsume, flushState) - // potentially consume the nextAgg as well in case we need to cascade an update due to a resend. - nextAgg, ok := e.nextAggWithLock(dirtyTime, targetNanos, isEarlierThanFn) - if ok && (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { - // if not already in the dirty set. - flushState, ready := e.updateFlushStateWithLock(nextAgg.startAtNanos, targetNanos, isEarlierThanFn) - if ready { - e.toConsume = append(e.toConsume, flushState) + // potentially consume the nextAgg as well in case we need to cascade an update from a previously flushed + // value. + if flushState.flushed { + nextAgg, ok := e.nextAggWithLock(dirtyTime, targetNanos, isEarlierThanFn) + // add if not already in the dirty set. + if ok && + isEarlierThanFn(int64(nextAgg.startAtNanos), resolution, targetNanos) && + // at the end of the dirty times OR the next dirty time does not match. + (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { + e.toConsume = append(e.toConsume, e.newFlushStateWithLock(nextAgg)) } } } @@ -412,23 +434,9 @@ func (e *GaugeElem) Consume( return canCollect } -func (e *GaugeElem) updateFlushStateWithLock(dirtyTime xtime.UnixNano, targetNanos int64, - isEarlierThanFn isEarlierThanFn) (aggFlushState, bool) { - resolution := e.sp.Resolution().Window - agg, ok := e.values[dirtyTime] - if !ok { - ts := dirtyTime.ToTime() - instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { - l.Error("dirty timestamp not in values map", zap.Time("ts", ts)) - }) - return aggFlushState{}, false - } - if !isEarlierThanFn(int64(dirtyTime), resolution, targetNanos) { - return aggFlushState{}, false - } - +func (e *GaugeElem) newFlushStateWithLock(agg timedGauge) aggFlushState { // note: flushState might be empty for the first flush - flushState := e.flushState[dirtyTime] + flushState := e.flushState[agg.startAtNanos] // copy the lockedAgg data to the flushState while holding the lock. agg.lockedAgg.Lock() flushState.dirty = agg.lockedAgg.dirty @@ -442,15 +450,15 @@ func (e *GaugeElem) updateFlushStateWithLock(dirtyTime xtime.UnixNano, targetNan agg.lockedAgg.Unlock() // update the flushState with everything else. - previousStartAligned, ok := e.previousStartAlignedWithLock(dirtyTime) + previousStartAligned, ok := e.previousStartAlignedWithLock(agg.startAtNanos) if ok { flushState.prevStartTime = previousStartAligned } else { flushState.prevStartTime = 0 } flushState.resendEnabled = agg.resendEnabled - flushState.startAt = dirtyTime - return flushState, true + flushState.startAt = agg.startAtNanos + return flushState } // Close closes the element. @@ -674,11 +682,9 @@ func (e *GaugeElem) processValue( l.Error("previous start time not in state map", zap.Time("ts", ts)) }) - } else if prevFlushState.consumedValues != nil { - // prev consumedValues may be null if the result was NaN. - prev.Value = prevFlushState.consumedValues[aggTypeIdx] - prev.TimeNanos = int64(prevFlushState.timestamp) } + prev.Value = prevFlushState.consumedValues[aggTypeIdx] + prev.TimeNanos = int64(prevFlushState.timestamp) } curr := transformation.Datapoint{ TimeNanos: int64(flushState.timestamp), @@ -690,12 +696,10 @@ func (e *GaugeElem) processValue( // We currently only support first-order derivative transformations so we only // need to keep one value. In the future if we need to support higher-order // derivative transformations, we need to store an array of values here. - if !math.IsNaN(curr.Value) { - if flushState.consumedValues == nil { - flushState.consumedValues = make([]float64, len(e.aggTypes)) - } - flushState.consumedValues[aggTypeIdx] = curr.Value + if flushState.consumedValues == nil { + flushState.consumedValues = make([]float64, len(e.aggTypes)) } + flushState.consumedValues[aggTypeIdx] = curr.Value value = res.Value case isUnaryMultiOp: curr := transformation.Datapoint{ diff --git a/src/aggregator/aggregator/generic_elem.go b/src/aggregator/aggregator/generic_elem.go index f36394e3a0..4f41e0d85c 100644 --- a/src/aggregator/aggregator/generic_elem.go +++ b/src/aggregator/aggregator/generic_elem.go @@ -27,7 +27,6 @@ import ( "time" "github.com/mauricelam/genny/generic" - "github.com/scaleway/scaleway-sdk-go/logger" "github.com/willf/bitset" "go.uber.org/zap" @@ -185,9 +184,11 @@ func (e *GenericElem) ResetSetData(data ElemData) error { // AddUnion adds a metric value union at a given timestamp. func (e *GenericElem) AddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool) error { - return e.doAddUnion(timestamp, mu, resendEnabled, false, timestamp) + return e.doAddUnion(timestamp, mu, resendEnabled, false) } -func (e *GenericElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool, retry bool, initialTimestamp time.Time) error { + +func (e *GenericElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool, retry bool, +) error { alignedStart := timestamp.Truncate(e.sp.Resolution().Window) lockedAgg, err := e.findOrCreate(alignedStart.UnixNano(), createAggregationOptions{ resendEnabled: resendEnabled, @@ -197,14 +198,14 @@ func (e *GenericElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnio } lockedAgg.Lock() if lockedAgg.closed { + // Note: this might have created an entry in the dirty set for lockedAgg when calling findOrCreate, even though + // it's already closed. The Consume loop will detect this and clean it up. lockedAgg.Unlock() if !resendEnabled && !retry { - return e.doAddUnion(alignedStart.Add(e.sp.Resolution().Window), mu, false, true, alignedStart) + // handle the edge case where the aggregation was already flushed/closed because the current time is right + // at the boundary. just roll the untimed metric into the next aggregation. + return e.doAddUnion(alignedStart.Add(e.sp.Resolution().Window), mu, false, true) } - logger.Errorf("aggregation already closed", - zap.Time("timestamp", timestamp), - zap.Time("initialTimestamp", initialTimestamp), - zap.Bool("retry", retry)) return errAggregationClosed } lockedAgg.aggregation.AddUnion(timestamp, mu) @@ -293,10 +294,8 @@ func (e *GenericElem) expireValuesWithLock( } resolution := e.sp.Resolution().Window - // start after the minimum to ensure we always keep at least one value in the map for binary transformations. - currStart := e.minStartTime.Add(resolution) + currStart := e.minStartTime resendExpire := targetNanos - int64(e.bufferForPastTimedMetricFn(resolution)) - for isEarlierThanFn(int64(currStart), resolution, targetNanos) { if currV, ok := e.values[currStart]; ok { if currV.resendEnabled { @@ -320,7 +319,9 @@ func (e *GenericElem) expireValuesWithLock( // if this current value is closed and clean it will no longer be flushed. this means it's safe // to remove the previous value since it will no longer be needed for binary transformations. when the // next value is eligible to be expired, this current value will actually be removed. - if prevV, ok := e.values[e.minStartTime]; ok { + // if we're currently pointing at the start skip this there is no previous for the start. this ensures + // we always keep at least one value in the map for binary transformations. + if prevV, ok := e.values[e.minStartTime]; ok && currStart != e.minStartTime { // can't expire flush state until after the flushing, so we save the time to expire later. e.flushStateToExpire = append(e.flushStateToExpire, e.minStartTime) delete(e.values, e.minStartTime) @@ -420,8 +421,26 @@ func (e *GenericElem) Consume( dirtyTimes := e.dirty e.dirty = e.dirty[:0] for i, dirtyTime := range dirtyTimes { - flushState, ready := e.updateFlushStateWithLock(dirtyTime, targetNanos, isEarlierThanFn) - if !ready || !flushState.dirty { + if !isEarlierThanFn(int64(dirtyTime), resolution, targetNanos) { + // not ready yet + e.dirty = append(e.dirty, dirtyTime) + continue + } + agg, ok := e.values[dirtyTime] + if !ok { + // there is a race where a writer adds a closed aggregation to the dirty set. eventually the closed + // aggregation is expired and removed from the values map. ok to skip. + continue + } + + flushState := e.newFlushStateWithLock(agg) + if flushState.dirty && flushState.flushed && !flushState.resendEnabled { + instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { + l.Error("reflushing aggregation without resendEnabled", zap.Any("flushState", flushState)) + }) + } + + if !flushState.dirty { // there is a race where the value was added to the dirty set, but the writer didn't actually update the // value yet (by marking dirty). add back to the dirty set so it can be processed in the next round once // the value has been updated. @@ -433,13 +452,16 @@ func (e *GenericElem) Consume( e.values[dirtyTime] = val e.toConsume = append(e.toConsume, flushState) - // potentially consume the nextAgg as well in case we need to cascade an update due to a resend. - nextAgg, ok := e.nextAggWithLock(dirtyTime, targetNanos, isEarlierThanFn) - if ok && (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { - // if not already in the dirty set. - flushState, ready := e.updateFlushStateWithLock(nextAgg.startAtNanos, targetNanos, isEarlierThanFn) - if ready { - e.toConsume = append(e.toConsume, flushState) + // potentially consume the nextAgg as well in case we need to cascade an update from a previously flushed + // value. + if flushState.flushed { + nextAgg, ok := e.nextAggWithLock(dirtyTime, targetNanos, isEarlierThanFn) + // add if not already in the dirty set. + if ok && + isEarlierThanFn(int64(nextAgg.startAtNanos), resolution, targetNanos) && + // at the end of the dirty times OR the next dirty time does not match. + (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { + e.toConsume = append(e.toConsume, e.newFlushStateWithLock(nextAgg)) } } } @@ -475,23 +497,9 @@ func (e *GenericElem) Consume( return canCollect } -func (e *GenericElem) updateFlushStateWithLock(dirtyTime xtime.UnixNano, targetNanos int64, - isEarlierThanFn isEarlierThanFn) (aggFlushState, bool) { - resolution := e.sp.Resolution().Window - agg, ok := e.values[dirtyTime] - if !ok { - ts := dirtyTime.ToTime() - instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { - l.Error("dirty timestamp not in values map", zap.Time("ts", ts)) - }) - return aggFlushState{}, false - } - if !isEarlierThanFn(int64(dirtyTime), resolution, targetNanos) { - return aggFlushState{}, false - } - +func (e *GenericElem) newFlushStateWithLock(agg timedAggregation) aggFlushState { // note: flushState might be empty for the first flush - flushState := e.flushState[dirtyTime] + flushState := e.flushState[agg.startAtNanos] // copy the lockedAgg data to the flushState while holding the lock. agg.lockedAgg.Lock() flushState.dirty = agg.lockedAgg.dirty @@ -505,15 +513,15 @@ func (e *GenericElem) updateFlushStateWithLock(dirtyTime xtime.UnixNano, targetN agg.lockedAgg.Unlock() // update the flushState with everything else. - previousStartAligned, ok := e.previousStartAlignedWithLock(dirtyTime) + previousStartAligned, ok := e.previousStartAlignedWithLock(agg.startAtNanos) if ok { flushState.prevStartTime = previousStartAligned } else { flushState.prevStartTime = 0 } flushState.resendEnabled = agg.resendEnabled - flushState.startAt = dirtyTime - return flushState, true + flushState.startAt = agg.startAtNanos + return flushState } // Close closes the element. @@ -737,11 +745,9 @@ func (e *GenericElem) processValue( l.Error("previous start time not in state map", zap.Time("ts", ts)) }) - } else if prevFlushState.consumedValues != nil { - // prev consumedValues may be null if the result was NaN. - prev.Value = prevFlushState.consumedValues[aggTypeIdx] - prev.TimeNanos = int64(prevFlushState.timestamp) } + prev.Value = prevFlushState.consumedValues[aggTypeIdx] + prev.TimeNanos = int64(prevFlushState.timestamp) } curr := transformation.Datapoint{ TimeNanos: int64(flushState.timestamp), @@ -753,12 +759,10 @@ func (e *GenericElem) processValue( // We currently only support first-order derivative transformations so we only // need to keep one value. In the future if we need to support higher-order // derivative transformations, we need to store an array of values here. - if !math.IsNaN(curr.Value) { - if flushState.consumedValues == nil { - flushState.consumedValues = make([]float64, len(e.aggTypes)) - } - flushState.consumedValues[aggTypeIdx] = curr.Value + if flushState.consumedValues == nil { + flushState.consumedValues = make([]float64, len(e.aggTypes)) } + flushState.consumedValues[aggTypeIdx] = curr.Value value = res.Value case isUnaryMultiOp: curr := transformation.Datapoint{ diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index c59d05efd4..84fc68715e 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -38,7 +38,6 @@ import ( "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" - "github.com/scaleway/scaleway-sdk-go/logger" "github.com/willf/bitset" "go.uber.org/zap" ) @@ -122,9 +121,11 @@ func (e *TimerElem) ResetSetData(data ElemData) error { // AddUnion adds a metric value union at a given timestamp. func (e *TimerElem) AddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool) error { - return e.doAddUnion(timestamp, mu, resendEnabled, false, timestamp) + return e.doAddUnion(timestamp, mu, resendEnabled, false) } -func (e *TimerElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool, retry bool, initialTimestamp time.Time) error { + +func (e *TimerElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, resendEnabled bool, retry bool, +) error { alignedStart := timestamp.Truncate(e.sp.Resolution().Window) lockedAgg, err := e.findOrCreate(alignedStart.UnixNano(), createAggregationOptions{ resendEnabled: resendEnabled, @@ -134,14 +135,14 @@ func (e *TimerElem) doAddUnion(timestamp time.Time, mu unaggregated.MetricUnion, } lockedAgg.Lock() if lockedAgg.closed { + // Note: this might have created an entry in the dirty set for lockedAgg when calling findOrCreate, even though + // it's already closed. The Consume loop will detect this and clean it up. lockedAgg.Unlock() if !resendEnabled && !retry { - return e.doAddUnion(alignedStart.Add(e.sp.Resolution().Window), mu, false, true, alignedStart) + // handle the edge case where the aggregation was already flushed/closed because the current time is right + // at the boundary. just roll the untimed metric into the next aggregation. + return e.doAddUnion(alignedStart.Add(e.sp.Resolution().Window), mu, false, true) } - logger.Errorf("aggregation already closed", - zap.Time("timestamp", timestamp), - zap.Time("initialTimestamp", initialTimestamp), - zap.Bool("retry", retry)) return errAggregationClosed } lockedAgg.aggregation.AddUnion(timestamp, mu) @@ -230,10 +231,8 @@ func (e *TimerElem) expireValuesWithLock( } resolution := e.sp.Resolution().Window - // start after the minimum to ensure we always keep at least one value in the map for binary transformations. - currStart := e.minStartTime.Add(resolution) + currStart := e.minStartTime resendExpire := targetNanos - int64(e.bufferForPastTimedMetricFn(resolution)) - for isEarlierThanFn(int64(currStart), resolution, targetNanos) { if currV, ok := e.values[currStart]; ok { if currV.resendEnabled { @@ -257,7 +256,9 @@ func (e *TimerElem) expireValuesWithLock( // if this current value is closed and clean it will no longer be flushed. this means it's safe // to remove the previous value since it will no longer be needed for binary transformations. when the // next value is eligible to be expired, this current value will actually be removed. - if prevV, ok := e.values[e.minStartTime]; ok { + // if we're currently pointing at the start skip this there is no previous for the start. this ensures + // we always keep at least one value in the map for binary transformations. + if prevV, ok := e.values[e.minStartTime]; ok && currStart != e.minStartTime { // can't expire flush state until after the flushing, so we save the time to expire later. e.flushStateToExpire = append(e.flushStateToExpire, e.minStartTime) delete(e.values, e.minStartTime) @@ -357,8 +358,26 @@ func (e *TimerElem) Consume( dirtyTimes := e.dirty e.dirty = e.dirty[:0] for i, dirtyTime := range dirtyTimes { - flushState, ready := e.updateFlushStateWithLock(dirtyTime, targetNanos, isEarlierThanFn) - if !ready || !flushState.dirty { + if !isEarlierThanFn(int64(dirtyTime), resolution, targetNanos) { + // not ready yet + e.dirty = append(e.dirty, dirtyTime) + continue + } + agg, ok := e.values[dirtyTime] + if !ok { + // there is a race where a writer adds a closed aggregation to the dirty set. eventually the closed + // aggregation is expired and removed from the values map. ok to skip. + continue + } + + flushState := e.newFlushStateWithLock(agg) + if flushState.dirty && flushState.flushed && !flushState.resendEnabled { + instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { + l.Error("reflushing aggregation without resendEnabled", zap.Any("flushState", flushState)) + }) + } + + if !flushState.dirty { // there is a race where the value was added to the dirty set, but the writer didn't actually update the // value yet (by marking dirty). add back to the dirty set so it can be processed in the next round once // the value has been updated. @@ -370,13 +389,16 @@ func (e *TimerElem) Consume( e.values[dirtyTime] = val e.toConsume = append(e.toConsume, flushState) - // potentially consume the nextAgg as well in case we need to cascade an update due to a resend. - nextAgg, ok := e.nextAggWithLock(dirtyTime, targetNanos, isEarlierThanFn) - if ok && (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { - // if not already in the dirty set. - flushState, ready := e.updateFlushStateWithLock(nextAgg.startAtNanos, targetNanos, isEarlierThanFn) - if ready { - e.toConsume = append(e.toConsume, flushState) + // potentially consume the nextAgg as well in case we need to cascade an update from a previously flushed + // value. + if flushState.flushed { + nextAgg, ok := e.nextAggWithLock(dirtyTime, targetNanos, isEarlierThanFn) + // add if not already in the dirty set. + if ok && + isEarlierThanFn(int64(nextAgg.startAtNanos), resolution, targetNanos) && + // at the end of the dirty times OR the next dirty time does not match. + (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { + e.toConsume = append(e.toConsume, e.newFlushStateWithLock(nextAgg)) } } } @@ -412,23 +434,9 @@ func (e *TimerElem) Consume( return canCollect } -func (e *TimerElem) updateFlushStateWithLock(dirtyTime xtime.UnixNano, targetNanos int64, - isEarlierThanFn isEarlierThanFn) (aggFlushState, bool) { - resolution := e.sp.Resolution().Window - agg, ok := e.values[dirtyTime] - if !ok { - ts := dirtyTime.ToTime() - instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { - l.Error("dirty timestamp not in values map", zap.Time("ts", ts)) - }) - return aggFlushState{}, false - } - if !isEarlierThanFn(int64(dirtyTime), resolution, targetNanos) { - return aggFlushState{}, false - } - +func (e *TimerElem) newFlushStateWithLock(agg timedTimer) aggFlushState { // note: flushState might be empty for the first flush - flushState := e.flushState[dirtyTime] + flushState := e.flushState[agg.startAtNanos] // copy the lockedAgg data to the flushState while holding the lock. agg.lockedAgg.Lock() flushState.dirty = agg.lockedAgg.dirty @@ -442,15 +450,15 @@ func (e *TimerElem) updateFlushStateWithLock(dirtyTime xtime.UnixNano, targetNan agg.lockedAgg.Unlock() // update the flushState with everything else. - previousStartAligned, ok := e.previousStartAlignedWithLock(dirtyTime) + previousStartAligned, ok := e.previousStartAlignedWithLock(agg.startAtNanos) if ok { flushState.prevStartTime = previousStartAligned } else { flushState.prevStartTime = 0 } flushState.resendEnabled = agg.resendEnabled - flushState.startAt = dirtyTime - return flushState, true + flushState.startAt = agg.startAtNanos + return flushState } // Close closes the element. @@ -674,11 +682,9 @@ func (e *TimerElem) processValue( l.Error("previous start time not in state map", zap.Time("ts", ts)) }) - } else if prevFlushState.consumedValues != nil { - // prev consumedValues may be null if the result was NaN. - prev.Value = prevFlushState.consumedValues[aggTypeIdx] - prev.TimeNanos = int64(prevFlushState.timestamp) } + prev.Value = prevFlushState.consumedValues[aggTypeIdx] + prev.TimeNanos = int64(prevFlushState.timestamp) } curr := transformation.Datapoint{ TimeNanos: int64(flushState.timestamp), @@ -690,12 +696,10 @@ func (e *TimerElem) processValue( // We currently only support first-order derivative transformations so we only // need to keep one value. In the future if we need to support higher-order // derivative transformations, we need to store an array of values here. - if !math.IsNaN(curr.Value) { - if flushState.consumedValues == nil { - flushState.consumedValues = make([]float64, len(e.aggTypes)) - } - flushState.consumedValues[aggTypeIdx] = curr.Value + if flushState.consumedValues == nil { + flushState.consumedValues = make([]float64, len(e.aggTypes)) } + flushState.consumedValues[aggTypeIdx] = curr.Value value = res.Value case isUnaryMultiOp: curr := transformation.Datapoint{