-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race when checking for dirty aggregations #3886
Conversation
@@ -75,25 +71,26 @@ type CounterElem struct { | |||
|
|||
// startTime -> agg (new one per every resolution) | |||
values map[xtime.UnixNano]timedCounter | |||
// startTime -> state. this is local state to the flusher and does not need to guarded with a lock. | |||
// values and flushState should always have the exact same key set. | |||
flushState map[xtime.UnixNano]aggFlushState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggFlushState
now holds all the state that is local to the flusher which does not need to be synchronized. much easier to reason about.
elemBase: newElemBase(opts), | ||
dirty: make([]xtime.UnixNano, 0, defaultNumAggregations), // in most cases values will have two entries | ||
values: make(map[xtime.UnixNano]timedCounter), | ||
flushState: make(map[xtime.UnixNano]aggFlushState), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now there is a separate flushState
map that is also start aligned. this removes all the nonsense of somethings are start aligned and some things are end aligned.
prevV.previousTimeNanos = xtime.UnixNano(timestampNanosFn(int64(e.minStartTime), resolution)) | ||
e.toExpire = append(e.toExpire, prevV) | ||
// can't expire flush state until after the flushing, so we save the time to expire later. | ||
e.flushStateToExpire = append(e.flushStateToExpire, e.minStartTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushState is kind of like the old consumedValues/previousTimeNanos
. although it's much easier to reason about because those fields are scoped to a single timestamp on the flush state. if you need the previous consumed values, you just look it from the previous flush state.
|
||
// note: flushState might be empty for the first flush | ||
flushState := e.flushState[dirtyTime] | ||
// copy the lockedAgg data to the flushState while holding the lock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the flush state now holds a copy of the lockedAgg values, so we don't need to hold the lock when actually processing the values.
if _, ok := e.consumedValues[prevTimeNanos]; ok { | ||
prev = e.consumedValues[prevTimeNanos][aggTypeIdx] | ||
if flushState.prevStartTime > 0 { | ||
prevFlushState, ok := e.flushState[flushState.prevStartTime] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hopefully this is simpler. now a flushState
can point to a previous flushState
through the prevStarTime
. from there you can get the prev consumedValues
.
@@ -176,15 +176,46 @@ type elemBase struct { | |||
listType metricListType | |||
|
|||
// Mutable states. | |||
tombstoned bool | |||
closed bool | |||
cachedSourceSetsLock sync.Mutex // nolint: structcheck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need this lock anymore because the values lock is always held when cachedSourceSets is accessed. so it's redundant.
Codecov Report
@@ Coverage Diff @@
## master #3886 +/- ##
========================================
- Coverage 56.8% 56.7% -0.2%
========================================
Files 553 553
Lines 63440 63325 -115
========================================
- Hits 36072 35940 -132
- Misses 24175 24187 +12
- Partials 3193 3198 +5
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report at Codecov.
|
if ok && !nextAgg.lockedAgg.dirty { | ||
nextAgg.previousTimeNanos = xtime.UnixNano(timestampNanosFn(int64(dirtyTime), resolution)) | ||
e.toConsume = append(e.toConsume, nextAgg) | ||
if ok && (i == len(dirtyTimes)-1 || dirtyTimes[i+1] != nextAgg.startAtNanos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this new behavior to prevent duplicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not new behavior. previously we only cascaded the update if the nextAgg wasn't dirty. however again we were incorrectly reading the dirty bit of the next agg without holding the lock. I could grab the nextAgg lock here but I want to avoid more locking. this basically achieves the same thing by checking if it's already in the dirty set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok yeah this makes sense
ts := dirtyTime.ToTime() | ||
instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { | ||
l.Error("dirty timestamp not in values map", zap.Time("ts", ts)) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case we need to return false otherwise agg
is empty and we'd panic on agg.lockedAgg
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. it's hard to have test coverage for these paths that should never happen and panic in tests.
This fixes the following race: 1. Writer creates a new aggregation and adds it to the drity set. 2. Flusher processes dirty set. 3. Writer updates the aggregation value. This race was introduced with the recent refactor of resending to use to an explicit dirty set. However, this race existed historically before resending introduced the notion of a dirty bit. The fix is to ensure the individual aggregation lock is held when read/writing the dirty bit. It was starting to get really confusing what state truly needs to be guarded by the aggregation lock. This also refactors the code to introduce an aggFlushState of flusher local state that doesn't need any synchronization. Now all fields on the lockedAggregation must be accessed with the lock.
671ae6f
to
6e09a00
Compare
6e09a00
to
30003f1
Compare
@@ -1071,6 +1073,80 @@ func TestAddUntimed_ResendEnabled(t *testing.T) { | |||
require.True(t, ok) | |||
} | |||
|
|||
func TestAddUntimed_ClosedAggregation(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test does a pretty good job illustrating the race condition and leak of the dirty set.
@@ -293,10 +294,8 @@ func (e *GenericElem) expireValuesWithLock( | |||
} | |||
resolution := e.sp.Resolution().Window | |||
|
|||
// start after the minimum to ensure we always keep at least one value in the map for binary transformations. | |||
currStart := e.minStartTime.Add(resolution) | |||
currStart := e.minStartTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we actually want to run this loop for e.minStartTime
so we close the aggregation.
e.toConsume = append(e.toConsume, flushState) | ||
// potentially consume the nextAgg as well in case we need to cascade an update from a previously flushed | ||
// value. | ||
if flushState.flushed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
figured this guard would help reduce some unnecessary lookups in the common case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this can be done since a "nextAgg" can only precede a "flushed current agg"? Might be worth commenting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's more we only care to cascade an update when a previously flushed value has been updated. will add a comment/example.
flushState.consumedValues = make([]float64, len(e.aggTypes)) | ||
} | ||
flushState.consumedValues[aggTypeIdx] = curr.Value | ||
if flushState.consumedValues == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this guard was dumb. it just causes a potential nil pointer when accessing the previous consumedValues
above. instead of having to add a branch for nil, just add NaN which the code can already handle anyways.
if prevV, ok := e.values[e.minStartTime]; ok { | ||
// if we're currently pointing at the start skip this there is no previous for the start. this ensures | ||
// we always keep at least one value in the map for binary transformations. | ||
if prevV, ok := e.values[e.minStartTime]; ok && currStart != e.minStartTime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok this makes sense. nit: this comment wording I think is missing a "because"
} | ||
prev.Value = prevFlushState.consumedValues[aggTypeIdx] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why's this unnecessary now? I guess anything "previous" must have consumedVals? That'd only be the case if the previous had a binary op though right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only time the consumed value of the previous aggregation could be nil, is if the consumed value was NaN
. this can happen for certain aggregations like Max
when there is no data yet. I removed the guard below that did not populate consumedValues
if the value was NaN
, so this can never happen now. The code could already handle a NaN
value, so it just seemed overly complicated. Removed 2 if
statements that provided no value.
"That'd only be the case if the previous had a binary op though right?" - You're in the binaryOp
case so you're guaranteed the previous was a binary op as well. The pipeline can't change within an aggregation (it would create a new aggregation).
also see the other comment below.
prevFlushState, ok := e.flushState[flushState.prevStartTime] | ||
if !ok { | ||
ts := flushState.prevStartTime.ToTime() | ||
instrument.EmitAndLogInvariantViolation(e.opts.InstrumentOptions(), func(l *zap.Logger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is another case where not panicking here (e.g. in prod) will result in a panic below because prevFlushState
will have an empty consumedValues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For something like this or some other bug it might be worth still keeping around that check against a nil consumedValues
- just to prevent a panic. And we could just do EmitAndLogInvariantViolation
if nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea good catch
This fixes the following race:
This race was introduced with the recent refactor of resending to use to
an explicit dirty set. However, this race existed historically before
resending introduced the notion of a dirty bit.
The fix is to ensure the individual aggregation lock is held when
read/writing the dirty bit.
It was starting to get really confusing what state truly needs to be guarded by the
aggregation lock. This also refactors the code to introduce an
aggFlushState of flusher local state that doesn't need any
synchronization. Now all fields on the lockedAggregation must be
accessed with the lock.
What this PR does / why we need it:
Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: