diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index 97cb812731d5..1f07daff294c 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -9,7 +9,9 @@ ## Description -The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum metrics to monotonic, delta sum metrics. Non-monotonic sums are excluded. +The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum and histogram metrics to monotonic, delta metrics. Non-monotonic sums and exponential histograms are excluded. + +Histogram conversion is currently behind a [feature gate](#feature-gate-configurations) and will only be converted if the feature flag is set. ## Configuration @@ -30,7 +32,7 @@ processors: # processor name: cumulativetodelta cumulativetodelta: - # list the exact cumulative sum metrics to convert to delta + # list the exact cumulative sum or histogram metrics to convert to delta include: metrics: - @@ -46,8 +48,8 @@ processors: # processor name: cumulativetodelta cumulativetodelta: - # Convert cumulative sum metrics to delta - # if and only if 'metric' is in the name + # Convert cumulative sum or histogram metrics to delta + # if and only if 'metric' is in the name include: metrics: - "*metric*" @@ -59,8 +61,8 @@ processors: # processor name: cumulativetodelta cumulativetodelta: - # Convert cumulative sum metrics to delta - # if and only if 'metric' is not in the name + # Convert cumulative sum or histogram metrics to delta + # if and only if 'metric' is not in the name exclude: metrics: - "*metric*" @@ -72,9 +74,17 @@ processors: # processor name: cumulativetodelta cumulativetodelta: # If include/exclude are not specified - # convert all cumulative sum metrics to delta + # convert all cumulative sum or histogram metrics to delta ``` +## Feature gate configurations + +The **processor.cumulativetodeltaprocessor.EnableHistogramSupport** feature flag controls whether cumulative histograms delta conversion is supported or not. It is disabled by default, meaning histograms will not be modified by the processor. If enabled, which histograms are converted is still subjected to the processor's include/exclude filtering. + +Pass `--feature-gates processor.cumulativetodeltaprocessor.EnableHistogramSupport` to enable this feature. + +This feature flag will be removed, and histograms will be enabled by default in release v0.60.0, September 2022. + ## Warnings - [Statefulness](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#statefulness): The cumulativetodelta processor's calculates delta by remembering the previous value of a metric. For this reason, the calculation is only accurate if the metric is continuously sent to the same instance of the collector. As a result, the cumulativetodelta processor may not work as expected if used in a deployment of multiple collectors. When using this processor it is best for the data source to being sending data to a single collector. diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go index fa590da7b4b1..0c7caa7cd332 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go @@ -32,6 +32,13 @@ type MetricIdentity struct { StartTimestamp pcommon.Timestamp Attributes pcommon.Map MetricValueType pmetric.NumberDataPointValueType + MetricField string +} + +type HistogramIdentities struct { + CountIdentity MetricIdentity + SumIdentity MetricIdentity + BucketIdentities []MetricIdentity } const A = int32('A') @@ -75,6 +82,11 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) { }) b.WriteByte(SEP) b.WriteString(strconv.FormatInt(int64(mi.StartTimestamp), 36)) + + if mi.MetricField != "" { + b.WriteByte(SEP) + b.WriteString(mi.MetricField) + } } func (mi *MetricIdentity) IsFloatVal() bool { @@ -82,5 +94,5 @@ func (mi *MetricIdentity) IsFloatVal() bool { } func (mi *MetricIdentity) IsSupportedMetricType() bool { - return mi.MetricDataType == pmetric.MetricDataTypeSum + return mi.MetricDataType == pmetric.MetricDataTypeSum || mi.MetricDataType == pmetric.MetricDataTypeHistogram } diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go index cd40b13b23af..edb0017ef935 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go @@ -43,6 +43,7 @@ func TestMetricIdentity_Write(t *testing.T) { StartTimestamp pcommon.Timestamp Attributes pcommon.Map MetricValueType pmetric.NumberDataPointValueType + MetricField string } tests := []struct { name string @@ -72,6 +73,18 @@ func TestMetricIdentity_Write(t *testing.T) { }, want: []string{"C" + SEPSTR + "B", "Y"}, }, + { + name: "histogram sum", + fields: fields{ + Resource: resource, + InstrumentationLibrary: il, + Attributes: attributes, + MetricDataType: pmetric.MetricDataTypeHistogram, + MetricValueType: pmetric.NumberDataPointValueTypeInt, + MetricField: "bound_100", + }, + want: []string{"D" + SEPSTR + "B", "bound_100"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -85,6 +98,7 @@ func TestMetricIdentity_Write(t *testing.T) { StartTimestamp: tt.fields.StartTimestamp, Attributes: tt.fields.Attributes, MetricValueType: tt.fields.MetricValueType, + MetricField: tt.fields.MetricField, } b := &bytes.Buffer{} mi.Write(b) @@ -159,6 +173,34 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) { fields: fields{ MetricDataType: pmetric.MetricDataTypeHistogram, }, + want: true, + }, + { + name: "none", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeNone, + }, + want: false, + }, + { + name: "gauge", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeGauge, + }, + want: false, + }, + { + name: "exponential_histogram", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeExponentialHistogram, + }, + want: false, + }, + { + name: "summary", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeSummary, + }, want: false, }, } diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index c57b49b1889f..6bd56e288186 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -16,29 +16,46 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "context" + "fmt" "math" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/service/featuregate" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" ) +const enableHistogramSupportGateID = "processor.cumulativetodeltaprocessor.EnableHistogramSupport" + +var enableHistogramSupportGate = featuregate.Gate{ + ID: enableHistogramSupportGateID, + Enabled: false, + Description: "wip", +} + +func init() { + featuregate.GetRegistry().MustRegister(enableHistogramSupportGate) +} + type cumulativeToDeltaProcessor struct { - includeFS filterset.FilterSet - excludeFS filterset.FilterSet - logger *zap.Logger - deltaCalculator *tracking.MetricTracker - cancelFunc context.CancelFunc + includeFS filterset.FilterSet + excludeFS filterset.FilterSet + logger *zap.Logger + deltaCalculator *tracking.MetricTracker + cancelFunc context.CancelFunc + histogramSupportEnabled bool } func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { ctx, cancel := context.WithCancel(context.Background()) p := &cumulativeToDeltaProcessor{ - logger: logger, - deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness), - cancelFunc: cancel, + logger: logger, + deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness), + cancelFunc: cancel, + histogramSupportEnabled: featuregate.GetRegistry().IsEnabled(enableHistogramSupportGateID), } if len(config.Include.Metrics) > 0 { p.includeFS, _ = filterset.CreateFilterSet(config.Include.Metrics, &config.Include.Config) @@ -81,6 +98,47 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme MetricIsMonotonic: ms.IsMonotonic(), } ctdp.convertDataPoints(ms.DataPoints(), baseIdentity) + ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + return ms.DataPoints().Len() == 0 + case pmetric.MetricDataTypeHistogram: + if !ctdp.histogramSupportEnabled { + return false + } + + ms := m.Histogram() + if ms.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative { + return false + } + + if ms.DataPoints().Len() == 0 { + return false + } + + countIdentity := tracking.MetricIdentity{ + Resource: rm.Resource(), + InstrumentationLibrary: ilm.Scope(), + MetricDataType: m.DataType(), + MetricName: m.Name(), + MetricUnit: m.Unit(), + MetricIsMonotonic: true, + MetricValueType: pmetric.NumberDataPointValueTypeInt, + MetricField: "count", + } + + sumIdentity := countIdentity + sumIdentity.MetricField = "sum" + sumIdentity.MetricValueType = pmetric.NumberDataPointValueTypeDouble + + bucketIdentities := makeBucketIdentities(countIdentity, ms.DataPoints().At(0)) + + histogramIdentities := tracking.HistogramIdentities{ + CountIdentity: countIdentity, + SumIdentity: sumIdentity, + BucketIdentities: bucketIdentities, + } + + ctdp.convertHistogramDataPoints(ms.DataPoints(), &histogramIdentities) + ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) return ms.DataPoints().Len() == 0 default: @@ -94,6 +152,19 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme return md, nil } +func makeBucketIdentities(baseIdentity tracking.MetricIdentity, dp pmetric.HistogramDataPoint) []tracking.MetricIdentity { + numBuckets := dp.BucketCounts().Len() + bucketIdentities := make([]tracking.MetricIdentity, numBuckets) + + for index := 0; index < numBuckets; index++ { + bucketIdentity := baseIdentity + bucketIdentity.MetricField = fmt.Sprintf("bucket_%d", index) + bucketIdentities[index] = bucketIdentity + } + + return bucketIdentities +} + func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error { ctdp.cancelFunc() return nil @@ -104,6 +175,32 @@ func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) b (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName)) } +func (ctdp *cumulativeToDeltaProcessor) convertHistogramFloatValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value float64) (tracking.DeltaValue, bool) { + id.StartTimestamp = dp.StartTimestamp() + id.Attributes = dp.Attributes() + trackingPoint := tracking.MetricPoint{ + Identity: id, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + FloatValue: value, + }, + } + return ctdp.deltaCalculator.Convert(trackingPoint) +} + +func (ctdp *cumulativeToDeltaProcessor) convertHistogramIntValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value int64) (tracking.DeltaValue, bool) { + id.StartTimestamp = dp.StartTimestamp() + id.Attributes = dp.Attributes() + trackingPoint := tracking.MetricPoint{ + Identity: id, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + IntValue: value, + }, + } + return ctdp.deltaCalculator.Convert(trackingPoint) +} + func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) { if dps, ok := in.(pmetric.NumberDataPointSlice); ok { @@ -146,3 +243,42 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId }) } } + +func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentities *tracking.HistogramIdentities) { + + if dps, ok := in.(pmetric.HistogramDataPointSlice); ok { + dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool { + countID := baseIdentities.CountIdentity + countDelta, countValid := ctdp.convertHistogramIntValue(countID, dp, int64(dp.Count())) + + hasSum := dp.HasSum() && !math.IsNaN(dp.Sum()) + sumDelta, sumValid := tracking.DeltaValue{}, true + + if hasSum { + sumID := baseIdentities.SumIdentity + sumDelta, sumValid = ctdp.convertHistogramFloatValue(sumID, dp, dp.Sum()) + } + + bucketsValid := true + rawBucketCounts := dp.BucketCounts().AsRaw() + for index := 0; index < len(rawBucketCounts); index++ { + bucketID := baseIdentities.BucketIdentities[index] + bucketDelta, bucketValid := ctdp.convertHistogramIntValue(bucketID, dp, int64(rawBucketCounts[index])) + rawBucketCounts[index] = uint64(bucketDelta.IntValue) + bucketsValid = bucketsValid && bucketValid + } + + if countValid && sumValid && bucketsValid { + dp.SetStartTimestamp(countDelta.StartTimestamp) + dp.SetCount(uint64(countDelta.IntValue)) + if hasSum { + dp.SetSum(sumDelta.FloatValue) + } + dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(rawBucketCounts)) + return false + } + + return true + }) + } +} diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index f1825f8fb534..14c15ba84b13 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -28,23 +28,33 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/service/featuregate" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset" ) -type testMetric struct { +type testSumMetric struct { metricNames []string metricValues [][]float64 isCumulative []bool } +type testHistogramMetric struct { + metricNames []string + metricCounts [][]uint64 + metricSums [][]float64 + metricBuckets [][][]uint64 + isCumulative []bool +} + type cumulativeToDeltaTest struct { - name string - include MatchMetrics - exclude MatchMetrics - inMetrics pmetric.Metrics - outMetrics pmetric.Metrics + name string + include MatchMetrics + exclude MatchMetrics + inMetrics pmetric.Metrics + outMetrics pmetric.Metrics + histogramSupportEnabled bool } var ( @@ -58,12 +68,12 @@ var ( RegexpConfig: nil, }, }, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, @@ -78,12 +88,12 @@ var ( RegexpConfig: nil, }, }, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 200, 500}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 100, 300}, {4}}, isCumulative: []bool{false, true}, @@ -98,12 +108,12 @@ var ( RegexpConfig: nil, }, }, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 200, math.NaN()}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 100, math.NaN()}, {4}}, isCumulative: []bool{false, true}, @@ -125,23 +135,151 @@ var ( RegexpConfig: nil, }, }, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, }), }, + { + name: "cumulative_to_delta_histogram_one_positive", + include: MatchMetrics{ + Metrics: []string{"metric_1"}, + Config: filterset.Config{ + MatchType: "strict", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{100, 200, 500}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}, {4}}, + metricSums: [][]float64{{100, 100, 300}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{false, true}, + }), + histogramSupportEnabled: true, + }, + { + name: "cumulative_to_delta_histogram_nan_sum", + include: MatchMetrics{ + Metrics: []string{"metric_1"}, + Config: filterset.Config{ + MatchType: "strict", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{100, math.NaN(), 500}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}, {4}}, + metricSums: [][]float64{{100, math.NaN(), 400}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{false, true}, + }), + histogramSupportEnabled: true, + }, + { + name: "cumulative_to_delta_histogram_one_positive_without_sums", + include: MatchMetrics{ + Metrics: []string{"metric_1"}, + Config: filterset.Config{ + MatchType: "strict", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}, {4}}, + metricSums: [][]float64{{}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{false, true}, + }), + histogramSupportEnabled: true, + }, + { + name: "cumulative_to_delta_histogram_ignored_without_feature", + include: MatchMetrics{ + Metrics: []string{"metric_1"}, + Config: filterset.Config{ + MatchType: "strict", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{100, 200, 500}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{100, 200, 500}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + histogramSupportEnabled: false, + }, } ) func TestCumulativeToDeltaProcessor(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { + registry := featuregate.GetRegistry() + registry.Apply(map[string]bool{ + enableHistogramSupportGateID: test.histogramSupportEnabled, + }) // next stores the results of the filter metric processor next := new(consumertest.MetricsSink) cfg := &Config{ @@ -208,6 +346,24 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { } } + if eM.DataType() == pmetric.MetricDataTypeHistogram { + eDataPoints := eM.Histogram().DataPoints() + aDataPoints := aM.Histogram().DataPoints() + + require.Equal(t, eDataPoints.Len(), aDataPoints.Len()) + require.Equal(t, eM.Histogram().AggregationTemporality(), aM.Histogram().AggregationTemporality()) + + for j := 0; j < eDataPoints.Len(); j++ { + require.Equal(t, eDataPoints.At(j).Count(), aDataPoints.At(j).Count()) + require.Equal(t, eDataPoints.At(j).HasSum(), aDataPoints.At(j).HasSum()) + if math.IsNaN(eDataPoints.At(j).Sum()) { + require.True(t, math.IsNaN(aDataPoints.At(j).Sum())) + } else { + require.Equal(t, eDataPoints.At(j).Sum(), aDataPoints.At(j).Sum()) + } + require.Equal(t, eDataPoints.At(j).BucketCounts().AsRaw(), aDataPoints.At(j).BucketCounts().AsRaw()) + } + } } require.NoError(t, mgp.Shutdown(ctx)) @@ -215,7 +371,7 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { } } -func generateTestMetrics(tm testMetric) pmetric.Metrics { +func generateTestSumMetrics(tm testSumMetric) pmetric.Metrics { md := pmetric.NewMetrics() now := time.Now() @@ -245,6 +401,41 @@ func generateTestMetrics(tm testMetric) pmetric.Metrics { return md } +func generateTestHistogramMetrics(tm testHistogramMetric) pmetric.Metrics { + md := pmetric.NewMetrics() + now := time.Now() + + rm := md.ResourceMetrics().AppendEmpty() + ms := rm.ScopeMetrics().AppendEmpty().Metrics() + for i, name := range tm.metricNames { + m := ms.AppendEmpty() + m.SetName(name) + m.SetDataType(pmetric.MetricDataTypeHistogram) + + hist := m.Histogram() + + if tm.isCumulative[i] { + hist.SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) + } else { + hist.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + } + + for index, count := range tm.metricCounts[i] { + dp := m.Histogram().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) + dp.SetCount(count) + + sums := tm.metricSums[i] + if len(sums) > 0 { + dp.SetSum(sums[index]) + } + dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(tm.metricBuckets[i][index])) + } + } + + return md +} + func BenchmarkConsumeMetrics(b *testing.B) { c := consumertest.NewNop() params := component.ProcessorCreateSettings{ diff --git a/unreleased/issue-12423.yaml b/unreleased/issue-12423.yaml new file mode 100755 index 000000000000..88318fa60605 --- /dev/null +++ b/unreleased/issue-12423.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cumulativetodeltaprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Cumulative Histogram metrics will now be converted to delta temporality and are no longer skipped. + +# One or more tracking issues related to the change +issues: [12423] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This means that consumers who currently rely on Histograms being skipped would need to ensure they are excluded via config.