From 24c666da4a5a3c7cb9aa2704fbb5e8a6e76500e9 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Mon, 18 Jul 2022 13:34:08 +0100 Subject: [PATCH] Implement cumulative-to-delta over histograms --- .../internal/tracking/identity.go | 8 +- .../internal/tracking/identity_test.go | 21 ++++ .../cumulativetodeltaprocessor/processor.go | 116 ++++++++++++++++++ 3 files changed, 144 insertions(+), 1 deletion(-) diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go index fa590da7b4b1..784622291063 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go @@ -32,6 +32,7 @@ type MetricIdentity struct { StartTimestamp pcommon.Timestamp Attributes pcommon.Map MetricValueType pmetric.NumberDataPointValueType + MetricField string } const A = int32('A') @@ -75,6 +76,11 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) { }) b.WriteByte(SEP) b.WriteString(strconv.FormatInt(int64(mi.StartTimestamp), 36)) + + if mi.MetricField != "" { + b.WriteByte(SEP) + b.WriteString(mi.MetricField) + } } func (mi *MetricIdentity) IsFloatVal() bool { @@ -82,5 +88,5 @@ func (mi *MetricIdentity) IsFloatVal() bool { } func (mi *MetricIdentity) IsSupportedMetricType() bool { - return mi.MetricDataType == pmetric.MetricDataTypeSum + return mi.MetricDataType == pmetric.MetricDataTypeSum || mi.MetricDataType == pmetric.MetricDataTypeHistogram } diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go index cd40b13b23af..26b2223ea834 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go @@ -43,6 +43,7 @@ func TestMetricIdentity_Write(t *testing.T) { StartTimestamp pcommon.Timestamp Attributes pcommon.Map MetricValueType pmetric.NumberDataPointValueType + MetricField string } tests := []struct { name string @@ -72,6 +73,18 @@ func TestMetricIdentity_Write(t *testing.T) { }, want: []string{"C" + SEPSTR + "B", "Y"}, }, + { + name: "histogram sum", + fields: fields{ + Resource: resource, + InstrumentationLibrary: il, + Attributes: attributes, + MetricDataType: pmetric.MetricDataTypeHistogram, + MetricValueType: pmetric.NumberDataPointValueTypeInt, + MetricField: "bound_100", + }, + want: []string{"D" + SEPSTR + "B", "bound_100"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -85,6 +98,7 @@ func TestMetricIdentity_Write(t *testing.T) { StartTimestamp: tt.fields.StartTimestamp, Attributes: tt.fields.Attributes, MetricValueType: tt.fields.MetricValueType, + MetricField: tt.fields.MetricField, } b := &bytes.Buffer{} mi.Write(b) @@ -159,6 +173,13 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) { fields: fields{ MetricDataType: pmetric.MetricDataTypeHistogram, }, + want: true, + }, + { + name: "gauge", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeGauge, + }, want: false, }, } diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index c42f70ef26c2..4c755cc688f0 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -15,9 +15,12 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor" import ( + "bytes" "context" + "fmt" "math" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -89,6 +92,47 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme MetricIsMonotonic: ms.IsMonotonic(), } ctdp.convertDataPoints(ms.DataPoints(), baseIdentity) + ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + return ms.DataPoints().Len() == 0 + case pmetric.MetricDataTypeHistogram: + ms := m.Histogram() + if ms.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative { + return false + } + + histogramIdentities := make([]tracking.MetricIdentity, 0, 16) + + countIdentity := tracking.MetricIdentity{ + Resource: rm.Resource(), + InstrumentationLibrary: ilm.Scope(), + MetricDataType: m.DataType(), + MetricName: m.Name(), + MetricUnit: m.Unit(), + MetricIsMonotonic: true, + MetricValueType: pmetric.NumberDataPointValueTypeInt, + MetricField: "count", + } + + sumIdentity := countIdentity + sumIdentity.MetricField = "sum" + sumIdentity.MetricValueType = pmetric.NumberDataPointValueTypeDouble + + histogramIdentities = append(histogramIdentities, countIdentity, sumIdentity) + + if ms.DataPoints().Len() == 0 { + return false + } + + firstDataPoint := ms.DataPoints().At(0) + for index := 0; index < firstDataPoint.BucketCounts().Len(); index++ { + metricField := fmt.Sprintf("bucket_%d", index) + bucketIdentity := countIdentity + bucketIdentity.MetricField = metricField + histogramIdentities = append(histogramIdentities, bucketIdentity) + } + + ctdp.convertHistogramDataPoints(ms.DataPoints(), &histogramIdentities) + ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) return ms.DataPoints().Len() == 0 default: @@ -159,3 +203,75 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId }) } } + +func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentities *[]tracking.MetricIdentity) { + + if dps, ok := in.(pmetric.HistogramDataPointSlice); ok { + dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool { + countId := (*baseIdentities)[0] + countId.StartTimestamp = dp.StartTimestamp() + countId.Attributes = dp.Attributes() + countPoint := tracking.MetricPoint{ + Identity: countId, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + IntValue: int64(dp.Count()), + }, + } + countDelta, countValid := ctdp.deltaCalculator.Convert(countPoint) + if !countValid { + return true + } + + dp.SetCount(uint64(countDelta.IntValue)) + + if dp.HasSum() { + sumId := (*baseIdentities)[1] + sumId.StartTimestamp = dp.StartTimestamp() + sumId.Attributes = dp.Attributes() + sumPoint := tracking.MetricPoint{ + Identity: sumId, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + FloatValue: dp.Sum(), + }, + } + sumDelta, sumValid := ctdp.deltaCalculator.Convert(sumPoint) + if !sumValid { + return true + } + + dp.SetSum(sumDelta.FloatValue) + } + + firstBucketIndex := 2 + rawCounts := dp.BucketCounts().AsRaw() + for index := 0; index < len(rawCounts); index++ { + bucketId := (*baseIdentities)[firstBucketIndex+index] + bucketId.StartTimestamp = dp.StartTimestamp() + bucketId.Attributes = dp.Attributes() + testBytes := &bytes.Buffer{} + bucketId.Write(testBytes) + fmt.Println(testBytes.String()) + bucketPoint := tracking.MetricPoint{ + Identity: bucketId, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + IntValue: int64(rawCounts[index]), + }, + } + bucketDelta, bucketValid := ctdp.deltaCalculator.Convert(bucketPoint) + if !bucketValid { + return true + } + + rawCounts[index] = uint64(bucketDelta.IntValue) + } + dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(rawCounts)) + + dp.SetStartTimestamp(countDelta.StartTimestamp) + + return false + }) + } +}