Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Add functionality to break up a summary metric into Sum metrics. #11041

Merged
merged 11 commits into from
Jun 21, 2022
6 changes: 6 additions & 0 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ Metric only functions:
- `convert_sum_to_gauge()` - Converts incoming metrics of type "Sum" to type "Gauge", retaining the metric's datapoints. Noop for metrics that are not of type "Sum".
**NOTE:** This function may cause a metric to break semantics for [Gauge metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#gauge). Use at your own risk.

- `convert_summary_count_val_to_sum(aggregation_temporality, is_monotonic)` - Creates a new Sum metric out of incoming metrics of type "Summary" with a "Count" Value.
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#sums). Use at your own risk.

- `convert_summary_sum_val_to_sum(aggregation_temporality, is_monotonic)` - Creates a new Sum metric out of incoming metrics of type "Summary" with a "Sum" Value.
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#sums). Use at your own risk.

- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge".
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#sums). Use at your own risk.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func convertSummarySumValToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSummary {
return nil
}

sumMetric := mtc.GetMetrics().AppendEmpty()
sumMetric.SetDescription(metric.Description())
sumMetric.SetName(metric.Name() + "_sum")
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(aggTemp)
sumMetric.Sum().SetIsMonotonic(monotonic)

sumDps := sumMetric.Sum().DataPoints()
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
sumDp := sumDps.AppendEmpty()
dp.Attributes().CopyTo(sumDp.Attributes())
sumDp.SetDoubleVal(dp.Sum())
}
return nil
}, nil
}

func convertSummaryCountValToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSummary {
return nil
}

sumMetric := mtc.GetMetrics().AppendEmpty()
sumMetric.SetDescription(metric.Description())
sumMetric.SetName(metric.Name() + "_count")
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(aggTemp)
sumMetric.Sum().SetIsMonotonic(monotonic)

sumDps := sumMetric.Sum().DataPoints()
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
sumDp := sumDps.AppendEmpty()
dp.Attributes().CopyTo(sumDp.Attributes())
sumDp.SetIntVal(int64(dp.Count()))
}
return nil
}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package metrics

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common/testhelper"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func getTestSummaryMetric() pmetric.Metric {
metricInput := pmetric.NewMetric()
metricInput.SetDataType(pmetric.MetricDataTypeSummary)
metricInput.SetName("summary_metric")
input := metricInput.Summary().DataPoints().AppendEmpty()
input.SetCount(100)
input.SetSum(12.34)

qVal1 := input.QuantileValues().AppendEmpty()
qVal1.SetValue(1)
qVal1.SetQuantile(.99)

qVal2 := input.QuantileValues().AppendEmpty()
qVal2.SetValue(2)
qVal2.SetQuantile(.95)

qVal3 := input.QuantileValues().AppendEmpty()
qVal3.SetValue(3)
qVal3.SetQuantile(.50)

attrs := getTestAttributes()
attrs.CopyTo(input.Attributes())
return metricInput
}

func getTestAttributes() pcommon.Map {
attrs := pcommon.NewMap()
attrs.InsertString("test", "hello world")
attrs.InsertInt("test2", 3)
attrs.InsertBool("test3", true)
return attrs
}

func TestConvertSummaryTransforms(t *testing.T) {
tests := []struct {
name string
inv common.Invocation
want func(pmetric.MetricSlice)
}{
{
name: "convert_summary_count_val_to_sum",
inv: common.Invocation{
Function: "convert_summary_count_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
sumMetric.Sum().SetIsMonotonic(false)

sumMetric.SetName("summary_metric_count")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(100)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "convert_summary_sum_val_to_sum",
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
sumMetric.Sum().SetIsMonotonic(false)

sumMetric.SetName("summary_metric_sum")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(12.34)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetrics := pmetric.NewMetricSlice()
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(actualMetrics.AppendEmpty())

evaluate, err := common.NewFunctionCall(tt.inv, DefaultFunctions(), ParsePath)
assert.NoError(t, err)
evaluate(metricTransformContext{
il: pcommon.NewInstrumentationScope(),
resource: pcommon.NewResource(),
metric: summaryMetric,
metrics: actualMetrics,
})

expected := pmetric.NewMetricSlice()
tt.want(expected)
assert.Equal(t, expected, actualMetrics)
})
}
}
6 changes: 4 additions & 2 deletions processor/transformprocessor/internal/metrics/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

// registry is a map of names to functions for metrics pipelines
var registry = map[string]interface{}{
"convert_sum_to_gauge": convertSumToGauge,
"convert_gauge_to_sum": convertGaugeToSum,
"convert_sum_to_gauge": convertSumToGauge,
"convert_gauge_to_sum": convertGaugeToSum,
"convert_summary_sum_val_to_sum": convertSummarySumValToSum,
"convert_summary_count_val_to_sum": convertSummaryCountValToSum,
}

func init() {
Expand Down
5 changes: 5 additions & 0 deletions processor/transformprocessor/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type metricTransformContext struct {
dataPoint interface{}
metric pmetric.Metric
metrics pmetric.MetricSlice
il pcommon.InstrumentationScope
resource pcommon.Resource
}
Expand All @@ -47,6 +48,10 @@ func (ctx metricTransformContext) GetMetric() pmetric.Metric {
return ctx.metric
}

func (ctx metricTransformContext) GetMetrics() pmetric.MetricSlice {
return ctx.metrics
}

// pathGetSetter is a getSetter which has been resolved using a path expression provided by a user.
type pathGetSetter struct {
getter common.ExprFunc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (p *Processor) ProcessMetrics(_ context.Context, td pmetric.Metrics) (pmetr
smetrics := rmetrics.ScopeMetrics().At(j)
ctx.il = smetrics.Scope()
metrics := smetrics.Metrics()
ctx.metrics = metrics
for k := 0; k < metrics.Len(); k++ {
ctx.metric = metrics.At(k)
switch ctx.metric.DataType() {
Expand Down