Skip to content

Commit

Permalink
pkg/metrics/wal: drop out-of-order exemplars (#5580)
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis authored Oct 24, 2023
1 parent 85c6f92 commit cd9d185
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
6 changes: 5 additions & 1 deletion pkg/metrics/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,12 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem
// Check for duplicate vs last stored exemplar for this series, and discard those.
// Otherwise, record the current exemplar as the latest.
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
// TODO(@tpaschalis) The case of OOO exemplars is currently unique to
// Native Histograms. Prometheus is tracking a (possibly different) fix to
// this, we should revisit once they've settled on a solution.
// https://github.com/prometheus/prometheus/issues/12971
prevExemplar := a.w.series.GetLatestExemplar(s.ref)
if prevExemplar != nil && prevExemplar.Equals(e) {
if prevExemplar != nil && (prevExemplar.Equals(e) || prevExemplar.Ts > e.Ts) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
}
Expand Down
39 changes: 31 additions & 8 deletions pkg/metrics/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,16 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)

e.Ts = 24
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)

require.NoError(t, app.Commit())
collector := walDataCollector{}
replayer := walReplayer{w: &collector}
require.NoError(t, replayer.Replay(s.wal.Dir()))

// We had 9 calls to AppendExemplar but only 4 of those should have gotten through
// We had 11 calls to AppendExemplar but only 4 of those should have gotten through.
require.Equal(t, 4, len(collector.exemplars))
}

Expand Down Expand Up @@ -452,14 +456,11 @@ func TestDBAllowOOOSamples(t *testing.T) {
}

require.NoError(t, app.Commit())

for _, metric := range payload {
for _, sa := range metric.samples {
// We want to set the timestamp to before. This should no longer trigger an out of order.
sa.ts = sa.ts - 10_000
}
}
for _, metric := range payload {
metric.Write(t, app)
// We want to set the timestamp to before using this offset.
// This should no longer trigger an out of order.
metric.WriteOOO(t, app, 10_000)
}
}

Expand Down Expand Up @@ -525,6 +526,28 @@ func (s *series) Write(t *testing.T, app storage.Appender) {
}
}

func (s *series) WriteOOO(t *testing.T, app storage.Appender, tsOffset int64) {
t.Helper()

lbls := labels.FromMap(map[string]string{"__name__": s.name})

offset := 0
if s.ref == nil {
// Write first sample to get ref ID
ref, err := app.Append(0, lbls, s.samples[0].ts-tsOffset, s.samples[0].val)
require.NoError(t, err)

s.ref = &ref
offset = 1
}

// Write other data points with AddFast
for _, sample := range s.samples[offset:] {
_, err := app.Append(*s.ref, lbls, sample.ts-tsOffset, sample.val)
require.NoError(t, err)
}
}

type seriesList []*series

// Filter creates a new seriesList with series filtered by a sample
Expand Down

0 comments on commit cd9d185

Please sign in to comment.