diff --git a/CHANGELOG.md b/CHANGELOG.md index 705f2b279909..fd1b15cf8dac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ - `exporter/awsemfexporter:`: Fix dead links in README.md. (#11027) - `googlecloudexporter`: Fix (self-obs) point_count metric calculation, concurrent map write panic, and dropped log attributes (#11051) - `signalfxexporter`: Event Type is a required field, if not set, set it to `unknown` to prevent signalfx ingest from dropping it (#11121) +- `prometheusreceiver`: validate that combined metric points (e.g. histograms) have the same timestamp (#9385) ## v0.53.0 diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index d19f57ec933f..32ad1c40a0ee 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -287,6 +287,10 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey string, ls labels.Label func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v float64) error { groupKey := mf.getGroupKey(ls) mg := mf.loadMetricGroupOrCreate(groupKey, ls, t) + if mg.ts != t { + mf.droppedTimeseries++ + return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName) + } switch mf.mtype { case pmetric.MetricDataTypeHistogram, pmetric.MetricDataTypeSummary: switch { diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go index 26b57557459b..eaaf2a89cebc 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go @@ -104,6 +104,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { labels labels.Labels scrapes []*scrape want func() pmetric.HistogramDataPoint + wantErr bool intervalStartTimeMs int64 }{ { @@ -114,9 +115,9 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { scrapes: []*scrape{ {at: 11, value: 66, metric: "histogram_count"}, {at: 11, value: 1004.78, metric: "histogram_sum"}, - {at: 13, value: 33, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "0.75"}}, - {at: 13, value: 55, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "2.75"}}, - {at: 13, value: 66, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "+Inf"}}, + {at: 11, value: 33, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "0.75"}}, + {at: 11, value: 55, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "2.75"}}, + {at: 11, value: 66, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "+Inf"}}, }, want: func() pmetric.HistogramDataPoint { point := pmetric.NewHistogramDataPoint() @@ -140,9 +141,9 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { scrapes: []*scrape{ {at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_stale_count"}, {at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_stale_sum"}, - {at: 13, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "0.75"}}, - {at: 13, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "2.75"}}, - {at: 13, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "+Inf"}}, + {at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "0.75"}}, + {at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "2.75"}}, + {at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "+Inf"}}, }, want: func() pmetric.HistogramDataPoint { point := pmetric.NewHistogramDataPoint() @@ -157,20 +158,43 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { return point }, }, + { + name: "histogram with inconsistent timestamps", + metricName: "histogram_inconsistent_ts", + intervalStartTimeMs: 11, + labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}}, + scrapes: []*scrape{ + {at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_stale_count"}, + {at: 12, value: math.Float64frombits(value.StaleNaN), metric: "histogram_stale_sum"}, + {at: 13, value: math.Float64frombits(value.StaleNaN), metric: "value"}, + }, + wantErr: true, + }, } for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { mp := newMetricFamily(tt.metricName, mc, zap.NewNop()) - for _, tv := range tt.scrapes { + for i, tv := range tt.scrapes { var lbls labels.Labels if tv.extraLabel.Name != "" { lbls = labels.NewBuilder(tt.labels).Set(tv.extraLabel.Name, tv.extraLabel.Value).Labels() } else { lbls = tt.labels.Copy() } - require.NoError(t, mp.Add(tv.metric, lbls, tv.at, tv.value)) + err := mp.Add(tv.metric, lbls, tv.at, tv.value) + if tt.wantErr { + if i != 0 { + require.Error(t, err) + } + } else { + require.NoError(t, err) + } + } + if tt.wantErr { + // Don't check the result if we got an error + return } require.Equal(t, 1, len(mp.groups), "Expecting exactly 1 groupKey") @@ -209,18 +233,26 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { name string labelsScrapes []*labelsScrapes want func() pmetric.SummaryDataPoint + wantErr bool }{ { name: "summary", labelsScrapes: []*labelsScrapes{ + { + labels: labels.Labels{ + {Name: "a", Value: "A"}, {Name: "b", Value: "B"}, + }, + scrapes: []*scrape{ + {at: 14, value: 10, metric: "summary_count"}, + {at: 14, value: 15, metric: "summary_sum"}, + }, + }, { labels: labels.Labels{ {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.0"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 10, value: 10, metric: "summary_count"}, - {at: 10, value: 12, metric: "summary_sum"}, - {at: 10, value: 8, metric: "value"}, + {at: 14, value: 8, metric: "value"}, }, }, { @@ -228,9 +260,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.75"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 11, value: 10, metric: "summary_count"}, - {at: 11, value: 1004.78, metric: "summary_sum"}, - {at: 11, value: 33.7, metric: "value"}, + {at: 14, value: 33.7, metric: "value"}, }, }, { @@ -238,9 +268,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.50"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 12, value: 10, metric: "summary_count"}, - {at: 12, value: 13, metric: "summary_sum"}, - {at: 12, value: 27, metric: "value"}, + {at: 14, value: 27, metric: "value"}, }, }, { @@ -248,9 +276,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.90"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 13, value: 10, metric: "summary_count"}, - {at: 13, value: 14, metric: "summary_sum"}, - {at: 13, value: 56, metric: "value"}, + {at: 14, value: 56, metric: "value"}, }, }, { @@ -258,8 +284,6 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.99"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 14, value: 10, metric: "summary_count"}, - {at: 14, value: 15, metric: "summary_sum"}, {at: 14, value: 82, metric: "value"}, }, }, @@ -300,9 +324,9 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.0"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 10, value: 10, metric: "summary_stale_count"}, - {at: 10, value: 12, metric: "summary_stale_sum"}, - {at: 10, value: 8, metric: "value"}, + {at: 14, value: 10, metric: "summary_stale_count"}, + {at: 14, value: 12, metric: "summary_stale_sum"}, + {at: 14, value: 8, metric: "value"}, }, }, { @@ -310,9 +334,9 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.75"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 11, value: 10, metric: "summary_stale_count"}, - {at: 11, value: 1004.78, metric: "summary_stale_sum"}, - {at: 11, value: 33.7, metric: "value"}, + {at: 14, value: 10, metric: "summary_stale_count"}, + {at: 14, value: 1004.78, metric: "summary_stale_sum"}, + {at: 14, value: 33.7, metric: "value"}, }, }, { @@ -320,9 +344,9 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.50"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 12, value: 10, metric: "summary_stale_count"}, - {at: 12, value: 13, metric: "summary_stale_sum"}, - {at: 12, value: 27, metric: "value"}, + {at: 14, value: 10, metric: "summary_stale_count"}, + {at: 14, value: 13, metric: "summary_stale_sum"}, + {at: 14, value: 27, metric: "value"}, }, }, { @@ -330,9 +354,9 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { {Name: "a", Value: "A"}, {Name: "quantile", Value: "0.90"}, {Name: "b", Value: "B"}, }, scrapes: []*scrape{ - {at: 13, value: 10, metric: "summary_stale_count"}, - {at: 13, value: 14, metric: "summary_stale_sum"}, - {at: 13, value: 56, metric: "value"}, + {at: 14, value: 10, metric: "summary_stale_count"}, + {at: 14, value: 14, metric: "summary_stale_sum"}, + {at: 14, value: 56, metric: "value"}, }, }, { @@ -373,6 +397,21 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { return point }, }, + { + name: "summary with inconsistent timestamps", + labelsScrapes: []*labelsScrapes{ + { + labels: labels.Labels{ + {Name: "a", Value: "A"}, {Name: "b", Value: "B"}, + }, + scrapes: []*scrape{ + {at: 11, value: 10, metric: "summary_count"}, + {at: 14, value: 15, metric: "summary_sum"}, + }, + }, + }, + wantErr: true, + }, } for _, tt := range tests { @@ -380,10 +419,22 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mp := newMetricFamily(tt.name, mc, zap.NewNop()) for _, lbs := range tt.labelsScrapes { - for _, scrape := range lbs.scrapes { - require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value)) + for i, scrape := range lbs.scrapes { + err := mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value) + if tt.wantErr { + // The first scrape won't have an error + if i != 0 { + require.Error(t, err) + } + } else { + require.NoError(t, err) + } } } + if tt.wantErr { + // Don't check the result if we got an error + return + } require.Equal(t, 1, len(mp.groups), "Expecting exactly 1 groupKey") // Get the lone group key.