Skip to content

Commit

Permalink
feat: delta to cumulative prometheus (#9919)
Browse files Browse the repository at this point in the history
  • Loading branch information
locmai authored Jun 7, 2022
1 parent 2650aac commit c522cef
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
- `tailsamplingprocessor`: Fix composite sampler with inverse policy
- `awsprometheusremotewriteexporter`: Fix signing of empty request bodies. (#10578)
- `sigv4authextension`: Fix signing of empty request bodies. (#10578)
- `prometheusexporter`: Converting monotonic Delta to Cumulative sums (#9919)
- `statsdreceiver`: Update the lastIntervalTime for Counter metrics (#9919)

## v0.52.0

Expand Down
20 changes: 18 additions & 2 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,13 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleSum := metric.Sum()

// Drop metrics with non-cumulative aggregations
if doubleSum.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative {
// Drop metrics with unspecified aggregations
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityUnspecified {
return
}

// Drop non-monotonic and non-cumulative metrics
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta && !doubleSum.IsMonotonic() {
return
}

Expand Down Expand Up @@ -204,6 +209,17 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
continue
}

// Delta-to-Cumulative
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).StartTimestamp() {
ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
ip.SetIntVal(ip.IntVal() + mv.value.Sum().DataPoints().At(0).IntVal())
case pmetric.NumberDataPointValueTypeDouble:
ip.SetDoubleVal(ip.DoubleVal() + mv.value.Sum().DataPoints().At(0).DoubleVal())
}
}

m := createMetric(metric)
m.Sum().SetIsMonotonic(metric.Sum().IsMonotonic())
m.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
Expand Down
198 changes: 172 additions & 26 deletions exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,6 @@ func TestAccumulateDeltaAggregation(t *testing.T) {
name string
fillMetric func(time.Time, pmetric.Metric)
}{
{
name: "IntSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "Sum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(42.42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "Histogram",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
Expand Down Expand Up @@ -386,6 +360,178 @@ func TestAccumulateMetrics(t *testing.T) {
}
}

func TestAccumulateDeltaToCumulative(t *testing.T) {
tests := []struct {
name string
metric func(time.Time, time.Time, float64, pmetric.MetricSlice)
}{
{
name: "MonotonicDeltaIntSum",
metric: func(startTimestamp, ts time.Time, v float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.SetDescription("test description")
metric.Sum().SetIsMonotonic(true)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(int64(v))
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTimestamp))
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "MonotonicDeltaSum",
metric: func(startTimestamp, timestamp time.Time, v float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetIsMonotonic(true)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
metric.SetDescription("test description")
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(v)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTimestamp))
dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

ts1 := time.Now().Add(-3 * time.Second)
ts2 := time.Now().Add(-2 * time.Second)
ts3 := time.Now().Add(-1 * time.Second)

resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)

// The first point arrived
tt.metric(ts1, ts2, 11, ilm.Metrics())
n := a.Accumulate(resourceMetrics)

require.Equal(t, 1, n)

// The next point arrived
tt.metric(ts2, ts3, 31, ilm.Metrics())
n = a.Accumulate(resourceMetrics)

require.Equal(t, 2, n)

mLabels, _, mValue, _, mIsMonotonic := getMetricProperties(ilm.Metrics().At(1))
signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), mLabels)
m, ok := a.registeredMetrics.Load(signature)
require.True(t, ok)

v := m.(*accumulatedValue)
vLabels, vTS, vValue, vTemporality, vIsMonotonic := getMetricProperties(v.value)

require.Equal(t, v.scope.Name(), "test")
require.Equal(t, v.value.DataType(), ilm.Metrics().At(0).DataType())
require.Equal(t, v.value.DataType(), ilm.Metrics().At(1).DataType())

vLabels.Range(func(k string, v pcommon.Value) bool {
r, _ := mLabels.Get(k)
require.Equal(t, r, v)
return true
})
require.Equal(t, mLabels.Len(), vLabels.Len())
require.Equal(t, mValue, vValue)
require.Equal(t, pmetric.MetricAggregationTemporalityCumulative, vTemporality)
require.Equal(t, mIsMonotonic, vIsMonotonic)

require.Equal(t, ts3.Unix(), vTS.Unix())
})
}
}

func TestAccumulateDroppedMetrics(t *testing.T) {
tests := []struct {
name string
fillMetric func(time.Time, pmetric.Metric)
}{
{
name: "NonMonotonicIntSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
metric.Sum().SetIsMonotonic(false)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "NonMonotonicSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
metric.Sum().SetIsMonotonic(false)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(42.42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "UnspecifiedIntSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityUnspecified)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "UnspecifiedSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityUnspecified)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(42.42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
tt.fillMetric(time.Now(), ilm.Metrics().AppendEmpty())

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 0, n)

signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), pcommon.NewMap())
v, ok := a.registeredMetrics.Load(signature)
require.False(t, ok)
require.Nil(t, v)
})
}
}

func getMetricProperties(metric pmetric.Metric) (
attributes pcommon.Map,
ts time.Time,
Expand Down
5 changes: 3 additions & 2 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func (p *StatsDParser) GetMetrics() pmetric.Metrics {
)
}

p.lastIntervalTime = timeNowFunc()
p.gauges = make(map[statsDMetricDescription]pmetric.ScopeMetrics)
p.counters = make(map[statsDMetricDescription]pmetric.ScopeMetrics)
p.timersAndDistributions = make([]pmetric.ScopeMetrics, 0)
Expand Down Expand Up @@ -210,7 +209,9 @@ func (p *StatsDParser) Aggregate(line string) error {
case CounterType:
_, ok := p.counters[parsedMetric.description]
if !ok {
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNowFunc(), p.lastIntervalTime)
timeNow := timeNowFunc()
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNow, p.lastIntervalTime)
p.lastIntervalTime = timeNow
} else {
point := p.counters[parsedMetric.description].Metrics().At(0).Sum().DataPoints().At(0)
point.SetIntVal(point.IntVal() + parsedMetric.counterValue())
Expand Down
10 changes: 5 additions & 5 deletions receiver/statsdreceiver/protocol/statsd_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)),
},
expectedTimer: []pmetric.ScopeMetrics{},
},
Expand All @@ -614,7 +614,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)),
},
expectedTimer: []pmetric.ScopeMetrics{},
},
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 215, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 75, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 75, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)),
},
expectedTimer: []pmetric.ScopeMetrics{},
},
Expand Down Expand Up @@ -729,7 +729,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(711, 0)),
},
},
}
Expand Down Expand Up @@ -777,7 +777,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(711, 0)),
},
},
}
Expand Down

0 comments on commit c522cef

Please sign in to comment.