Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Add functionality to break up a summary metric into Sum metrics. #11041

Merged
merged 11 commits into from
Jun 21, 2022
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

## 🛑 Breaking changes 🛑
- `transformprocessor`: `metric.is_monotonic` is now accessed via a bool literal instead of a string. (#10473)

- `vcenterreceiver`: Changed the attribute `effective` on `vcenter.cluster.host.count` as it will now be reported as a bool rather than a string (#10914)

### 🚩 Deprecations 🚩
Expand All @@ -26,6 +25,7 @@
- `metricstransformprocessor`: Migrate the processor from OC to pdata (#10817)
- This behavior can be reverted by disabling the `processor.metricstransformprocessor.UseOTLPDataModel` feature gate.
- `transformprocessor`: Add byte slice literal to the grammar. Add new SpanID and TraceID functions that take a byte slice and return a Span/Trace ID. (#10487)
- `transformprocessor`: Add Summary transform functions. (#11041)
- `transformprocessor`: Add nil literal to the grammar. (#11150)
- `elasticsearchreceiver`: Add integration test for elasticsearch receiver (#10165)
- `tailsamplingprocessor`: New sampler added that allows to sample based on minimum number of spans
Expand Down
9 changes: 8 additions & 1 deletion processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ Metric only functions:
- `convert_sum_to_gauge()` - Converts incoming metrics of type "Sum" to type "Gauge", retaining the metric's datapoints. Noop for metrics that are not of type "Sum".
**NOTE:** This function may cause a metric to break semantics for [Gauge metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#gauge). Use at your own risk.

- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge".
- `convert_summary_count_val_to_sum(aggregation_temporality, is_monotonic)` - Creates a new Sum metric out of incoming metrics of type "Summary" with a "Count" Value. Noop for metrics that are not of type "Summary". The name for the new metric with be `<summary metric>_count`. The fields that are copied are: `timestamp`, `starttimestamp`, `attibutes`, and `description`.
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#sums). Use at your own risk.

- `convert_summary_sum_val_to_sum(aggregation_temporality, is_monotonic)` - Creates a new Sum metric out of incoming metrics of type "Summary" with a "Sum" Value. Noop for metrics that are not of type "Summary". The name for the new metric with be `<summary metric>_sum`. The fields that are copied are: `timestamp`, `starttimestamp`, `attibutes`, and `description`. The new metric that is created will be passed to all functions in the metrics queries list. Function conditions will apply.
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#sums). Use at your own risk.


- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge". The new metric that is created will be passed to all functions in the metrics queries list. Function conditions will apply.
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#sums). Use at your own risk.

Supported where operations:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func convertSummaryCountValToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSummary {
return nil
}

sumMetric := mtc.GetMetrics().AppendEmpty()
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
sumMetric.SetDescription(metric.Description())
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
sumMetric.SetName(metric.Name() + "_count")
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
sumMetric.SetUnit(metric.Unit())
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(aggTemp)
sumMetric.Sum().SetIsMonotonic(monotonic)

sumDps := sumMetric.Sum().DataPoints()
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
sumDp := sumDps.AppendEmpty()
dp.Attributes().CopyTo(sumDp.Attributes())
sumDp.SetIntVal(int64(dp.Count()))
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
sumDp.SetStartTimestamp(dp.StartTimestamp())
sumDp.SetTimestamp(dp.Timestamp())
}
return nil
}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common/testhelper"
)

func getTestSummaryMetric() pmetric.Metric {
metricInput := pmetric.NewMetric()
metricInput.SetDataType(pmetric.MetricDataTypeSummary)
metricInput.SetName("summary_metric")
input := metricInput.Summary().DataPoints().AppendEmpty()
input.SetCount(100)
input.SetSum(12.34)

qVal1 := input.QuantileValues().AppendEmpty()
qVal1.SetValue(1)
qVal1.SetQuantile(.99)

qVal2 := input.QuantileValues().AppendEmpty()
qVal2.SetValue(2)
qVal2.SetQuantile(.95)

qVal3 := input.QuantileValues().AppendEmpty()
qVal3.SetValue(3)
qVal3.SetQuantile(.50)

attrs := getTestAttributes()
attrs.CopyTo(input.Attributes())
return metricInput
}

func getTestGaugeMetric() pmetric.Metric {
metricInput := pmetric.NewMetric()
metricInput.SetDataType(pmetric.MetricDataTypeGauge)
metricInput.SetName("gauge_metric")
input := metricInput.Gauge().DataPoints().AppendEmpty()
input.SetIntVal(12)

attrs := getTestAttributes()
attrs.CopyTo(input.Attributes())
return metricInput
}

func getTestAttributes() pcommon.Map {
attrs := pcommon.NewMap()
attrs.InsertString("test", "hello world")
attrs.InsertInt("test2", 3)
attrs.InsertBool("test3", true)
return attrs
}

func summaryTest(tests []summaryTestCase, t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetrics := pmetric.NewMetricSlice()
tt.input.CopyTo(actualMetrics.AppendEmpty())

evaluate, err := common.NewFunctionCall(tt.inv, DefaultFunctions(), ParsePath)
assert.NoError(t, err)
evaluate(metricTransformContext{
il: pcommon.NewInstrumentationScope(),
resource: pcommon.NewResource(),
metric: tt.input,
metrics: actualMetrics,
})

expected := pmetric.NewMetricSlice()
tt.want(expected)
assert.Equal(t, expected, actualMetrics)
})
}
}

type summaryTestCase struct {
name string
input pmetric.Metric
inv common.Invocation
want func(pmetric.MetricSlice)
}

func Test_ConvertSummarySumValToSum(t *testing.T) {
tests := []summaryTestCase{
{
name: "convert_summary_sum_val_to_sum",
input: getTestSummaryMetric(),
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
sumMetric.Sum().SetIsMonotonic(false)

sumMetric.SetName("summary_metric_sum")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(12.34)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "convert_summary_sum_val_to_sum (monotonic)",
input: getTestSummaryMetric(),
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(true)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
sumMetric.Sum().SetIsMonotonic(true)

sumMetric.SetName("summary_metric_sum")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(12.34)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "convert_summary_sum_val_to_sum (cumulative)",
input: getTestSummaryMetric(),
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("cumulative"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
sumMetric.Sum().SetIsMonotonic(false)

sumMetric.SetName("summary_metric_sum")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(12.34)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "convert_summary_sum_val_to_sum (no op)",
input: getTestGaugeMetric(),
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
gaugeMetric := getTestGaugeMetric()
gaugeMetric.CopyTo(metrics.AppendEmpty())
},
},
}
summaryTest(tests, t)
}

func Test_ConvertSummarySumValToSum_validation(t *testing.T) {
tests := []struct {
name string
stringAggTemp string
}{
{
name: "invalid aggregation temporality",
stringAggTemp: "not a real aggregation temporality",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := convertSummarySumValToSum(tt.stringAggTemp, true)
assert.Error(t, err, "unknown aggregation temporality: not a real aggregation temporality")
})
}
}
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func convertSummarySumValToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSummary {
return nil
}

sumMetric := mtc.GetMetrics().AppendEmpty()
sumMetric.SetDescription(metric.Description())
sumMetric.SetName(metric.Name() + "_sum")
sumMetric.SetUnit(metric.Unit())
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(aggTemp)
sumMetric.Sum().SetIsMonotonic(monotonic)

sumDps := sumMetric.Sum().DataPoints()
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
sumDp := sumDps.AppendEmpty()
dp.Attributes().CopyTo(sumDp.Attributes())
sumDp.SetDoubleVal(dp.Sum())
sumDp.SetStartTimestamp(dp.StartTimestamp())
sumDp.SetTimestamp(dp.Timestamp())
}
return nil
}, nil
}
Loading