Skip to content

Commit

Permalink
[processor/cumulativetodelta] drop data points with NoRecordedValue (#…
Browse files Browse the repository at this point in the history
…18766)

[processor/cumulativetodelta] drop data points with NoRecordedValue flag set
  • Loading branch information
seankhliao authored Feb 21, 2023
1 parent 284a73f commit 63c3f2e
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 3 deletions.
16 changes: 16 additions & 0 deletions .chloggen/cumulativetodelta-novaluerecorded.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cumulativetodeltaprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Data points with the NoValueRecorded flag set are no longer considered in calculating deltas.

# One or more tracking issues related to the change
issues: [18766]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
12 changes: 10 additions & 2 deletions processor/cumulativetodeltaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) b
}

func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) {

if dps, ok := in.(pmetric.NumberDataPointSlice); ok {
dps.RemoveIf(func(dp pmetric.NumberDataPoint) bool {
id := baseIdentity
Expand All @@ -151,6 +150,11 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId
point := tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
}

if dp.Flags().NoRecordedValue() {
// drop points with no value
return true
}
if id.IsFloatVal() {
// Do not attempt to transform NaN values
if math.IsNaN(dp.DoubleValue()) {
Expand Down Expand Up @@ -180,13 +184,17 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId
}

func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) {

if dps, ok := in.(pmetric.HistogramDataPointSlice); ok {
dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
id := baseIdentity
id.StartTimestamp = dp.StartTimestamp()
id.Attributes = dp.Attributes()

if dp.Flags().NoRecordedValue() {
// drop points with no value
return true
}

point := tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
HistogramValue: &tracking.HistogramPoint{
Expand Down
71 changes: 70 additions & 1 deletion processor/cumulativetodeltaprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,17 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset"
)

var (
zeroFlag = pmetric.DefaultDataPointFlags
noValueFlag = pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)
)

type testSumMetric struct {
metricNames []string
metricValues [][]float64
isCumulative []bool
isMonotonic []bool
flags [][]pmetric.DataPointFlags
}

type testHistogramMetric struct {
Expand All @@ -48,6 +54,7 @@ type testHistogramMetric struct {
metricMaxes [][]float64
metricBuckets [][][]uint64
isCumulative []bool
flags [][]pmetric.DataPointFlags
}

type cumulativeToDeltaTest struct {
Expand Down Expand Up @@ -126,6 +133,29 @@ func TestCumulativeToDeltaProcessor(t *testing.T) {
isMonotonic: []bool{true, true},
}),
},
{
name: "cumulative_to_delta_nodata",
inMetrics: generateTestSumMetrics(testSumMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{0, 100, 0, 200, 400}, {0, 100, 0, 0, 400}},
isCumulative: []bool{true, true},
isMonotonic: []bool{true, true},
flags: [][]pmetric.DataPointFlags{
{zeroFlag, zeroFlag, noValueFlag, zeroFlag, zeroFlag},
{zeroFlag, zeroFlag, noValueFlag, noValueFlag, zeroFlag},
},
}),
outMetrics: generateTestSumMetrics(testSumMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100, 100, 200}, {100, 300}},
isCumulative: []bool{false, false},
isMonotonic: []bool{true, true},
flags: [][]pmetric.DataPointFlags{
{zeroFlag, zeroFlag, zeroFlag},
{zeroFlag, zeroFlag},
},
}),
},
{
name: "cumulative_to_delta_exclude_precedence",
include: MatchMetrics{
Expand Down Expand Up @@ -261,6 +291,37 @@ func TestCumulativeToDeltaProcessor(t *testing.T) {
isCumulative: []bool{false, true},
}),
},
{
name: "cumulative_to_delta_histogram_novalue",
inMetrics: generateTestHistogramMetrics(testHistogramMetric{
metricNames: []string{"metric_1", "metric_2"},
metricCounts: [][]uint64{{0, 100, 0, 500}, {0, 2, 0, 0, 16}},
metricSums: [][]float64{{0, 100, 0, 500}, {0, 3, 0, 0, 81}},
metricBuckets: [][][]uint64{
{{0, 0, 0}, {50, 25, 25}, {0, 0, 0}, {250, 125, 125}},
{{0, 0, 0}, {1, 1, 1}, {0, 0, 0}, {0, 0, 0}, {21, 40, 20}},
},
isCumulative: []bool{true, true},
flags: [][]pmetric.DataPointFlags{
{zeroFlag, zeroFlag, noValueFlag, zeroFlag},
{zeroFlag, zeroFlag, noValueFlag, noValueFlag, zeroFlag},
},
}),
outMetrics: generateTestHistogramMetrics(testHistogramMetric{
metricNames: []string{"metric_1", "metric_2"},
metricCounts: [][]uint64{{100, 400}, {2, 14}},
metricSums: [][]float64{{100, 400}, {3, 78}},
metricBuckets: [][][]uint64{
{{50, 25, 25}, {200, 100, 100}},
{{1, 1, 1}, {20, 39, 19}},
},
isCumulative: []bool{false, false},
flags: [][]pmetric.DataPointFlags{
{zeroFlag, zeroFlag},
{zeroFlag, zeroFlag},
},
}),
},
{
name: "cumulative_to_delta_histogram_one_positive_without_sums",
include: MatchMetrics{
Expand Down Expand Up @@ -430,6 +491,7 @@ func TestCumulativeToDeltaProcessor(t *testing.T) {
} else {
require.Equal(t, eDataPoints.At(j).DoubleValue(), aDataPoints.At(j).DoubleValue())
}
require.Equal(t, eDataPoints.At(j).Flags(), aDataPoints.At(j).Flags())
}
}

Expand All @@ -451,6 +513,7 @@ func TestCumulativeToDeltaProcessor(t *testing.T) {
require.Equal(t, eDataPoints.At(j).Sum(), aDataPoints.At(j).Sum())
}
require.Equal(t, eDataPoints.At(j).BucketCounts(), aDataPoints.At(j).BucketCounts())
require.Equal(t, eDataPoints.At(j).Flags(), aDataPoints.At(j).Flags())
}
}
}
Expand Down Expand Up @@ -478,10 +541,13 @@ func generateTestSumMetrics(tm testSumMetric) pmetric.Metrics {
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
}

for _, value := range tm.metricValues[i] {
for index, value := range tm.metricValues[i] {
dp := m.Sum().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second)))
dp.SetDoubleValue(value)
if len(tm.flags) > i && len(tm.flags[i]) > index {
dp.SetFlags(tm.flags[i][index])
}
}
}

Expand Down Expand Up @@ -527,6 +593,9 @@ func generateTestHistogramMetrics(tm testHistogramMetric) pmetric.Metrics {
}
}
dp.BucketCounts().FromRaw(tm.metricBuckets[i][index])
if len(tm.flags) > i && len(tm.flags[i]) > index {
dp.SetFlags(tm.flags[i][index])
}
}
}

Expand Down

0 comments on commit 63c3f2e

Please sign in to comment.