Skip to content

Commit

Permalink
[receiver/prometheus] Validate that combined metrics all have the sam…
Browse files Browse the repository at this point in the history
…e timestamp (#9385)

* validate that combined metrics all have the same timestamp

* add unit tests

Co-authored-by: Alex Boten <[email protected]>
Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
3 people authored Jun 20, 2022
1 parent 426c5a9 commit 3f24405
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
- `exporter/awsemfexporter:`: Fix dead links in README.md. (#11027)
- `googlecloudexporter`: Fix (self-obs) point_count metric calculation, concurrent map write panic, and dropped log attributes (#11051)
- `signalfxexporter`: Event Type is a required field, if not set, set it to `unknown` to prevent signalfx ingest from dropping it (#11121)
- `prometheusreceiver`: validate that combined metric points (e.g. histograms) have the same timestamp (#9385)

## v0.53.0

Expand Down
4 changes: 4 additions & 0 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey string, ls labels.Label
func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v float64) error {
groupKey := mf.getGroupKey(ls)
mg := mf.loadMetricGroupOrCreate(groupKey, ls, t)
if mg.ts != t {
mf.droppedTimeseries++
return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)
}
switch mf.mtype {
case pmetric.MetricDataTypeHistogram, pmetric.MetricDataTypeSummary:
switch {
Expand Down
123 changes: 87 additions & 36 deletions receiver/prometheusreceiver/internal/otlp_metricfamily_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
labels labels.Labels
scrapes []*scrape
want func() pmetric.HistogramDataPoint
wantErr bool
intervalStartTimeMs int64
}{
{
Expand All @@ -114,9 +115,9 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
scrapes: []*scrape{
{at: 11, value: 66, metric: "histogram_count"},
{at: 11, value: 1004.78, metric: "histogram_sum"},
{at: 13, value: 33, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "0.75"}},
{at: 13, value: 55, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "2.75"}},
{at: 13, value: 66, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "+Inf"}},
{at: 11, value: 33, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "0.75"}},
{at: 11, value: 55, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "2.75"}},
{at: 11, value: 66, metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "+Inf"}},
},
want: func() pmetric.HistogramDataPoint {
point := pmetric.NewHistogramDataPoint()
Expand All @@ -140,9 +141,9 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
scrapes: []*scrape{
{at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_stale_count"},
{at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_stale_sum"},
{at: 13, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "0.75"}},
{at: 13, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "2.75"}},
{at: 13, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "+Inf"}},
{at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "0.75"}},
{at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "2.75"}},
{at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_bucket", extraLabel: labels.Label{Name: "le", Value: "+Inf"}},
},
want: func() pmetric.HistogramDataPoint {
point := pmetric.NewHistogramDataPoint()
Expand All @@ -157,20 +158,43 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
return point
},
},
{
name: "histogram with inconsistent timestamps",
metricName: "histogram_inconsistent_ts",
intervalStartTimeMs: 11,
labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}},
scrapes: []*scrape{
{at: 11, value: math.Float64frombits(value.StaleNaN), metric: "histogram_stale_count"},
{at: 12, value: math.Float64frombits(value.StaleNaN), metric: "histogram_stale_sum"},
{at: 13, value: math.Float64frombits(value.StaleNaN), metric: "value"},
},
wantErr: true,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamily(tt.metricName, mc, zap.NewNop())
for _, tv := range tt.scrapes {
for i, tv := range tt.scrapes {
var lbls labels.Labels
if tv.extraLabel.Name != "" {
lbls = labels.NewBuilder(tt.labels).Set(tv.extraLabel.Name, tv.extraLabel.Value).Labels()
} else {
lbls = tt.labels.Copy()
}
require.NoError(t, mp.Add(tv.metric, lbls, tv.at, tv.value))
err := mp.Add(tv.metric, lbls, tv.at, tv.value)
if tt.wantErr {
if i != 0 {
require.Error(t, err)
}
} else {
require.NoError(t, err)
}
}
if tt.wantErr {
// Don't check the result if we got an error
return
}

require.Equal(t, 1, len(mp.groups), "Expecting exactly 1 groupKey")
Expand Down Expand Up @@ -209,57 +233,57 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
name string
labelsScrapes []*labelsScrapes
want func() pmetric.SummaryDataPoint
wantErr bool
}{
{
name: "summary",
labelsScrapes: []*labelsScrapes{
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 14, value: 10, metric: "summary_count"},
{at: 14, value: 15, metric: "summary_sum"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.0"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 10, value: 10, metric: "summary_count"},
{at: 10, value: 12, metric: "summary_sum"},
{at: 10, value: 8, metric: "value"},
{at: 14, value: 8, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.75"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 11, value: 10, metric: "summary_count"},
{at: 11, value: 1004.78, metric: "summary_sum"},
{at: 11, value: 33.7, metric: "value"},
{at: 14, value: 33.7, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.50"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 12, value: 10, metric: "summary_count"},
{at: 12, value: 13, metric: "summary_sum"},
{at: 12, value: 27, metric: "value"},
{at: 14, value: 27, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.90"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 13, value: 10, metric: "summary_count"},
{at: 13, value: 14, metric: "summary_sum"},
{at: 13, value: 56, metric: "value"},
{at: 14, value: 56, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.99"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 14, value: 10, metric: "summary_count"},
{at: 14, value: 15, metric: "summary_sum"},
{at: 14, value: 82, metric: "value"},
},
},
Expand Down Expand Up @@ -300,39 +324,39 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.0"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 10, value: 10, metric: "summary_stale_count"},
{at: 10, value: 12, metric: "summary_stale_sum"},
{at: 10, value: 8, metric: "value"},
{at: 14, value: 10, metric: "summary_stale_count"},
{at: 14, value: 12, metric: "summary_stale_sum"},
{at: 14, value: 8, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.75"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 11, value: 10, metric: "summary_stale_count"},
{at: 11, value: 1004.78, metric: "summary_stale_sum"},
{at: 11, value: 33.7, metric: "value"},
{at: 14, value: 10, metric: "summary_stale_count"},
{at: 14, value: 1004.78, metric: "summary_stale_sum"},
{at: 14, value: 33.7, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.50"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 12, value: 10, metric: "summary_stale_count"},
{at: 12, value: 13, metric: "summary_stale_sum"},
{at: 12, value: 27, metric: "value"},
{at: 14, value: 10, metric: "summary_stale_count"},
{at: 14, value: 13, metric: "summary_stale_sum"},
{at: 14, value: 27, metric: "value"},
},
},
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.90"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 13, value: 10, metric: "summary_stale_count"},
{at: 13, value: 14, metric: "summary_stale_sum"},
{at: 13, value: 56, metric: "value"},
{at: 14, value: 10, metric: "summary_stale_count"},
{at: 14, value: 14, metric: "summary_stale_sum"},
{at: 14, value: 56, metric: "value"},
},
},
{
Expand Down Expand Up @@ -373,17 +397,44 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
return point
},
},
{
name: "summary with inconsistent timestamps",
labelsScrapes: []*labelsScrapes{
{
labels: labels.Labels{
{Name: "a", Value: "A"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 11, value: 10, metric: "summary_count"},
{at: 14, value: 15, metric: "summary_sum"},
},
},
},
wantErr: true,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamily(tt.name, mc, zap.NewNop())
for _, lbs := range tt.labelsScrapes {
for _, scrape := range lbs.scrapes {
require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value))
for i, scrape := range lbs.scrapes {
err := mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value)
if tt.wantErr {
// The first scrape won't have an error
if i != 0 {
require.Error(t, err)
}
} else {
require.NoError(t, err)
}
}
}
if tt.wantErr {
// Don't check the result if we got an error
return
}

require.Equal(t, 1, len(mp.groups), "Expecting exactly 1 groupKey")
// Get the lone group key.
Expand Down

0 comments on commit 3f24405

Please sign in to comment.