diff --git a/pkg/metrics/wal/wal.go b/pkg/metrics/wal/wal.go index e1176afdeccb..c3d74773755d 100644 --- a/pkg/metrics/wal/wal.go +++ b/pkg/metrics/wal/wal.go @@ -759,8 +759,12 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem // Check for duplicate vs last stored exemplar for this series, and discard those. // Otherwise, record the current exemplar as the latest. // Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here. + // TODO(@tpaschalis) The case of OOO exemplars is currently unique to + // Native Histograms. Prometheus is tracking a (possibly different) fix to + // this, we should revisit once they've settled on a solution. + // https://github.com/prometheus/prometheus/issues/12971 prevExemplar := a.w.series.GetLatestExemplar(s.ref) - if prevExemplar != nil && prevExemplar.Equals(e) { + if prevExemplar != nil && (prevExemplar.Equals(e) || prevExemplar.Ts > e.Ts) { // Duplicate, don't return an error but don't accept the exemplar. return 0, nil } diff --git a/pkg/metrics/wal/wal_test.go b/pkg/metrics/wal/wal_test.go index 9aaf52d639d1..bf929edb51d3 100644 --- a/pkg/metrics/wal/wal_test.go +++ b/pkg/metrics/wal/wal_test.go @@ -163,12 +163,16 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) { _, _ = app.AppendExemplar(sRef, nil, e) _, _ = app.AppendExemplar(sRef, nil, e) + e.Ts = 24 + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + require.NoError(t, app.Commit()) collector := walDataCollector{} replayer := walReplayer{w: &collector} require.NoError(t, replayer.Replay(s.wal.Dir())) - // We had 9 calls to AppendExemplar but only 4 of those should have gotten through + // We had 11 calls to AppendExemplar but only 4 of those should have gotten through. require.Equal(t, 4, len(collector.exemplars)) } @@ -452,14 +456,11 @@ func TestDBAllowOOOSamples(t *testing.T) { } require.NoError(t, app.Commit()) + for _, metric := range payload { - for _, sa := range metric.samples { - // We want to set the timestamp to before. This should no longer trigger an out of order. - sa.ts = sa.ts - 10_000 - } - } - for _, metric := range payload { - metric.Write(t, app) + // We want to set the timestamp to before using this offset. + // This should no longer trigger an out of order. + metric.WriteOOO(t, app, 10_000) } } @@ -525,6 +526,28 @@ func (s *series) Write(t *testing.T, app storage.Appender) { } } +func (s *series) WriteOOO(t *testing.T, app storage.Appender, tsOffset int64) { + t.Helper() + + lbls := labels.FromMap(map[string]string{"__name__": s.name}) + + offset := 0 + if s.ref == nil { + // Write first sample to get ref ID + ref, err := app.Append(0, lbls, s.samples[0].ts-tsOffset, s.samples[0].val) + require.NoError(t, err) + + s.ref = &ref + offset = 1 + } + + // Write other data points with AddFast + for _, sample := range s.samples[offset:] { + _, err := app.Append(*s.ref, lbls, sample.ts-tsOffset, sample.val) + require.NoError(t, err) + } +} + type seriesList []*series // Filter creates a new seriesList with series filtered by a sample