Skip to content

Commit

Permalink
Implement cumulative-to-delta over histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
mistodon committed Jul 18, 2022
1 parent 4e86e77 commit 24c666d
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type MetricIdentity struct {
StartTimestamp pcommon.Timestamp
Attributes pcommon.Map
MetricValueType pmetric.NumberDataPointValueType
MetricField string
}

const A = int32('A')
Expand Down Expand Up @@ -75,12 +76,17 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) {
})
b.WriteByte(SEP)
b.WriteString(strconv.FormatInt(int64(mi.StartTimestamp), 36))

if mi.MetricField != "" {
b.WriteByte(SEP)
b.WriteString(mi.MetricField)
}
}

func (mi *MetricIdentity) IsFloatVal() bool {
return mi.MetricValueType == pmetric.NumberDataPointValueTypeDouble
}

func (mi *MetricIdentity) IsSupportedMetricType() bool {
return mi.MetricDataType == pmetric.MetricDataTypeSum
return mi.MetricDataType == pmetric.MetricDataTypeSum || mi.MetricDataType == pmetric.MetricDataTypeHistogram
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestMetricIdentity_Write(t *testing.T) {
StartTimestamp pcommon.Timestamp
Attributes pcommon.Map
MetricValueType pmetric.NumberDataPointValueType
MetricField string
}
tests := []struct {
name string
Expand Down Expand Up @@ -72,6 +73,18 @@ func TestMetricIdentity_Write(t *testing.T) {
},
want: []string{"C" + SEPSTR + "B", "Y"},
},
{
name: "histogram sum",
fields: fields{
Resource: resource,
InstrumentationLibrary: il,
Attributes: attributes,
MetricDataType: pmetric.MetricDataTypeHistogram,
MetricValueType: pmetric.NumberDataPointValueTypeInt,
MetricField: "bound_100",
},
want: []string{"D" + SEPSTR + "B", "bound_100"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -85,6 +98,7 @@ func TestMetricIdentity_Write(t *testing.T) {
StartTimestamp: tt.fields.StartTimestamp,
Attributes: tt.fields.Attributes,
MetricValueType: tt.fields.MetricValueType,
MetricField: tt.fields.MetricField,
}
b := &bytes.Buffer{}
mi.Write(b)
Expand Down Expand Up @@ -159,6 +173,13 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) {
fields: fields{
MetricDataType: pmetric.MetricDataTypeHistogram,
},
want: true,
},
{
name: "gauge",
fields: fields{
MetricDataType: pmetric.MetricDataTypeGauge,
},
want: false,
},
}
Expand Down
116 changes: 116 additions & 0 deletions processor/cumulativetodeltaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor"

import (
"bytes"
"context"
"fmt"
"math"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

Expand Down Expand Up @@ -89,6 +92,47 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme
MetricIsMonotonic: ms.IsMonotonic(),
}
ctdp.convertDataPoints(ms.DataPoints(), baseIdentity)
ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
return ms.DataPoints().Len() == 0
case pmetric.MetricDataTypeHistogram:
ms := m.Histogram()
if ms.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative {
return false
}

histogramIdentities := make([]tracking.MetricIdentity, 0, 16)

countIdentity := tracking.MetricIdentity{
Resource: rm.Resource(),
InstrumentationLibrary: ilm.Scope(),
MetricDataType: m.DataType(),
MetricName: m.Name(),
MetricUnit: m.Unit(),
MetricIsMonotonic: true,
MetricValueType: pmetric.NumberDataPointValueTypeInt,
MetricField: "count",
}

sumIdentity := countIdentity
sumIdentity.MetricField = "sum"
sumIdentity.MetricValueType = pmetric.NumberDataPointValueTypeDouble

histogramIdentities = append(histogramIdentities, countIdentity, sumIdentity)

if ms.DataPoints().Len() == 0 {
return false
}

firstDataPoint := ms.DataPoints().At(0)
for index := 0; index < firstDataPoint.BucketCounts().Len(); index++ {
metricField := fmt.Sprintf("bucket_%d", index)
bucketIdentity := countIdentity
bucketIdentity.MetricField = metricField
histogramIdentities = append(histogramIdentities, bucketIdentity)
}

ctdp.convertHistogramDataPoints(ms.DataPoints(), &histogramIdentities)

ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
return ms.DataPoints().Len() == 0
default:
Expand Down Expand Up @@ -159,3 +203,75 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId
})
}
}

func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentities *[]tracking.MetricIdentity) {

if dps, ok := in.(pmetric.HistogramDataPointSlice); ok {
dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
countId := (*baseIdentities)[0]
countId.StartTimestamp = dp.StartTimestamp()
countId.Attributes = dp.Attributes()
countPoint := tracking.MetricPoint{
Identity: countId,
Value: tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
IntValue: int64(dp.Count()),
},
}
countDelta, countValid := ctdp.deltaCalculator.Convert(countPoint)
if !countValid {
return true
}

dp.SetCount(uint64(countDelta.IntValue))

if dp.HasSum() {
sumId := (*baseIdentities)[1]
sumId.StartTimestamp = dp.StartTimestamp()
sumId.Attributes = dp.Attributes()
sumPoint := tracking.MetricPoint{
Identity: sumId,
Value: tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
FloatValue: dp.Sum(),
},
}
sumDelta, sumValid := ctdp.deltaCalculator.Convert(sumPoint)
if !sumValid {
return true
}

dp.SetSum(sumDelta.FloatValue)
}

firstBucketIndex := 2
rawCounts := dp.BucketCounts().AsRaw()
for index := 0; index < len(rawCounts); index++ {
bucketId := (*baseIdentities)[firstBucketIndex+index]
bucketId.StartTimestamp = dp.StartTimestamp()
bucketId.Attributes = dp.Attributes()
testBytes := &bytes.Buffer{}
bucketId.Write(testBytes)
fmt.Println(testBytes.String())
bucketPoint := tracking.MetricPoint{
Identity: bucketId,
Value: tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
IntValue: int64(rawCounts[index]),
},
}
bucketDelta, bucketValid := ctdp.deltaCalculator.Convert(bucketPoint)
if !bucketValid {
return true
}

rawCounts[index] = uint64(bucketDelta.IntValue)
}
dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(rawCounts))

dp.SetStartTimestamp(countDelta.StartTimestamp)

return false
})
}
}

0 comments on commit 24c666d

Please sign in to comment.