diff --git a/CHANGELOG.md b/CHANGELOG.md index c72013ebb1ce..b195a8b3dc67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ - `tailsamplingprocessor`: Fix composite sampler with inverse policy - `awsprometheusremotewriteexporter`: Fix signing of empty request bodies. (#10578) - `sigv4authextension`: Fix signing of empty request bodies. (#10578) +- `prometheusexporter`: Converting monotonic Delta to Cumulative sums (#9919) +- `statsdreceiver`: Update the lastIntervalTime for Counter metrics (#9919) ## v0.52.0 diff --git a/exporter/prometheusexporter/accumulator.go b/exporter/prometheusexporter/accumulator.go index cd7cf41de72f..8b73938ac209 100644 --- a/exporter/prometheusexporter/accumulator.go +++ b/exporter/prometheusexporter/accumulator.go @@ -172,8 +172,13 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) { doubleSum := metric.Sum() - // Drop metrics with non-cumulative aggregations - if doubleSum.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative { + // Drop metrics with unspecified aggregations + if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityUnspecified { + return + } + + // Drop non-monotonic and non-cumulative metrics + if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta && !doubleSum.IsMonotonic() { return } @@ -204,6 +209,17 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I continue } + // Delta-to-Cumulative + if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).StartTimestamp() { + ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp()) + switch ip.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + ip.SetIntVal(ip.IntVal() + mv.value.Sum().DataPoints().At(0).IntVal()) + case pmetric.NumberDataPointValueTypeDouble: + ip.SetDoubleVal(ip.DoubleVal() + mv.value.Sum().DataPoints().At(0).DoubleVal()) + } + } + m := createMetric(metric) m.Sum().SetIsMonotonic(metric.Sum().IsMonotonic()) m.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) diff --git a/exporter/prometheusexporter/accumulator_test.go b/exporter/prometheusexporter/accumulator_test.go index 86ca235aae99..bd5f87e0ce52 100644 --- a/exporter/prometheusexporter/accumulator_test.go +++ b/exporter/prometheusexporter/accumulator_test.go @@ -39,32 +39,6 @@ func TestAccumulateDeltaAggregation(t *testing.T) { name string fillMetric func(time.Time, pmetric.Metric) }{ - { - name: "IntSum", - fillMetric: func(ts time.Time, metric pmetric.Metric) { - metric.SetName("test_metric") - metric.SetDataType(pmetric.MetricDataTypeSum) - metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) - dp := metric.Sum().DataPoints().AppendEmpty() - dp.SetIntVal(42) - dp.Attributes().InsertString("label_1", "1") - dp.Attributes().InsertString("label_2", "2") - dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) - }, - }, - { - name: "Sum", - fillMetric: func(ts time.Time, metric pmetric.Metric) { - metric.SetName("test_metric") - metric.SetDataType(pmetric.MetricDataTypeSum) - metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) - dp := metric.Sum().DataPoints().AppendEmpty() - dp.SetDoubleVal(42.42) - dp.Attributes().InsertString("label_1", "1") - dp.Attributes().InsertString("label_2", "2") - dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) - }, - }, { name: "Histogram", fillMetric: func(ts time.Time, metric pmetric.Metric) { @@ -386,6 +360,178 @@ func TestAccumulateMetrics(t *testing.T) { } } +func TestAccumulateDeltaToCumulative(t *testing.T) { + tests := []struct { + name string + metric func(time.Time, time.Time, float64, pmetric.MetricSlice) + }{ + { + name: "MonotonicDeltaIntSum", + metric: func(startTimestamp, ts time.Time, v float64, metrics pmetric.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("test_metric") + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.SetDescription("test description") + metric.Sum().SetIsMonotonic(true) + metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetIntVal(int64(v)) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTimestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + }, + }, + { + name: "MonotonicDeltaSum", + metric: func(startTimestamp, timestamp time.Time, v float64, metrics pmetric.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("test_metric") + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.Sum().SetIsMonotonic(true) + metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + metric.SetDescription("test description") + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetDoubleVal(v) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTimestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + ts1 := time.Now().Add(-3 * time.Second) + ts2 := time.Now().Add(-2 * time.Second) + ts3 := time.Now().Add(-1 * time.Second) + + resourceMetrics := pmetric.NewResourceMetrics() + ilm := resourceMetrics.ScopeMetrics().AppendEmpty() + ilm.Scope().SetName("test") + a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) + + // The first point arrived + tt.metric(ts1, ts2, 11, ilm.Metrics()) + n := a.Accumulate(resourceMetrics) + + require.Equal(t, 1, n) + + // The next point arrived + tt.metric(ts2, ts3, 31, ilm.Metrics()) + n = a.Accumulate(resourceMetrics) + + require.Equal(t, 2, n) + + mLabels, _, mValue, _, mIsMonotonic := getMetricProperties(ilm.Metrics().At(1)) + signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), mLabels) + m, ok := a.registeredMetrics.Load(signature) + require.True(t, ok) + + v := m.(*accumulatedValue) + vLabels, vTS, vValue, vTemporality, vIsMonotonic := getMetricProperties(v.value) + + require.Equal(t, v.scope.Name(), "test") + require.Equal(t, v.value.DataType(), ilm.Metrics().At(0).DataType()) + require.Equal(t, v.value.DataType(), ilm.Metrics().At(1).DataType()) + + vLabels.Range(func(k string, v pcommon.Value) bool { + r, _ := mLabels.Get(k) + require.Equal(t, r, v) + return true + }) + require.Equal(t, mLabels.Len(), vLabels.Len()) + require.Equal(t, mValue, vValue) + require.Equal(t, pmetric.MetricAggregationTemporalityCumulative, vTemporality) + require.Equal(t, mIsMonotonic, vIsMonotonic) + + require.Equal(t, ts3.Unix(), vTS.Unix()) + }) + } +} + +func TestAccumulateDroppedMetrics(t *testing.T) { + tests := []struct { + name string + fillMetric func(time.Time, pmetric.Metric) + }{ + { + name: "NonMonotonicIntSum", + fillMetric: func(ts time.Time, metric pmetric.Metric) { + metric.SetName("test_metric") + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + metric.Sum().SetIsMonotonic(false) + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetIntVal(42) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + }, + }, + { + name: "NonMonotonicSum", + fillMetric: func(ts time.Time, metric pmetric.Metric) { + metric.SetName("test_metric") + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + metric.Sum().SetIsMonotonic(false) + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetDoubleVal(42.42) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + }, + }, + { + name: "UnspecifiedIntSum", + fillMetric: func(ts time.Time, metric pmetric.Metric) { + metric.SetName("test_metric") + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityUnspecified) + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetIntVal(42) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + }, + }, + { + name: "UnspecifiedSum", + fillMetric: func(ts time.Time, metric pmetric.Metric) { + metric.SetName("test_metric") + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityUnspecified) + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetDoubleVal(42.42) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resourceMetrics := pmetric.NewResourceMetrics() + ilm := resourceMetrics.ScopeMetrics().AppendEmpty() + ilm.Scope().SetName("test") + tt.fillMetric(time.Now(), ilm.Metrics().AppendEmpty()) + + a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) + n := a.Accumulate(resourceMetrics) + require.Equal(t, 0, n) + + signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), pcommon.NewMap()) + v, ok := a.registeredMetrics.Load(signature) + require.False(t, ok) + require.Nil(t, v) + }) + } +} + func getMetricProperties(metric pmetric.Metric) ( attributes pcommon.Map, ts time.Time, diff --git a/receiver/statsdreceiver/protocol/statsd_parser.go b/receiver/statsdreceiver/protocol/statsd_parser.go index 994c085a68a6..5923b50ea03a 100644 --- a/receiver/statsdreceiver/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/protocol/statsd_parser.go @@ -165,7 +165,6 @@ func (p *StatsDParser) GetMetrics() pmetric.Metrics { ) } - p.lastIntervalTime = timeNowFunc() p.gauges = make(map[statsDMetricDescription]pmetric.ScopeMetrics) p.counters = make(map[statsDMetricDescription]pmetric.ScopeMetrics) p.timersAndDistributions = make([]pmetric.ScopeMetrics, 0) @@ -210,7 +209,9 @@ func (p *StatsDParser) Aggregate(line string) error { case CounterType: _, ok := p.counters[parsedMetric.description] if !ok { - p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNowFunc(), p.lastIntervalTime) + timeNow := timeNowFunc() + p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNow, p.lastIntervalTime) + p.lastIntervalTime = timeNow } else { point := p.counters[parsedMetric.description].Metrics().At(0).Sum().DataPoints().At(0) point.SetIntVal(point.IntVal() + parsedMetric.counterValue()) diff --git a/receiver/statsdreceiver/protocol/statsd_parser_test.go b/receiver/statsdreceiver/protocol/statsd_parser_test.go index 6927ccf8391b..7f65bd7a5072 100644 --- a/receiver/statsdreceiver/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/protocol/statsd_parser_test.go @@ -589,7 +589,7 @@ func TestStatsDParser_Aggregate(t *testing.T) { testDescription("statsdTestMetric1", "c", []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)), testDescription("statsdTestMetric2", "c", - []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)), + []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)), }, expectedTimer: []pmetric.ScopeMetrics{}, }, @@ -614,7 +614,7 @@ func TestStatsDParser_Aggregate(t *testing.T) { testDescription("statsdTestMetric1", "c", []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)), testDescription("statsdTestMetric2", "c", - []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)), + []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)), }, expectedTimer: []pmetric.ScopeMetrics{}, }, @@ -642,7 +642,7 @@ func TestStatsDParser_Aggregate(t *testing.T) { testDescription("statsdTestMetric1", "c", []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 215, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)), testDescription("statsdTestMetric2", "c", - []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 75, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)), + []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 75, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)), }, expectedTimer: []pmetric.ScopeMetrics{}, }, @@ -729,7 +729,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) { testDescription("statsdTestMetric1", "c", []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(611, 0)), testDescription("statsdTestMetric2", "c", - []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(611, 0)), + []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(711, 0)), }, }, } @@ -777,7 +777,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) { testDescription("statsdTestMetric1", "c", []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(611, 0)), testDescription("statsdTestMetric2", "c", - []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(611, 0)), + []string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(711, 0)), }, }, }