Skip to content

Commit

Permalink
[processor/transform] Add functionality to break up a summary metric …
Browse files Browse the repository at this point in the history
…into Sum metrics. (#11041)

* Add Summary value extraction functions

* Sepearate into seperate func files

* Add clarification to readme

* Add more tests

* Update readme note

* gofmt

* Fix links
  • Loading branch information
Miguel Rodriguez authored Jun 21, 2022
1 parent f86cd6e commit b7d9045
Show file tree
Hide file tree
Showing 11 changed files with 668 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

## 🛑 Breaking changes 🛑
- `transformprocessor`: `metric.is_monotonic` is now accessed via a bool literal instead of a string. (#10473)

- `vcenterreceiver`: Changed the attribute `effective` on `vcenter.cluster.host.count` as it will now be reported as a bool rather than a string (#10914)

### 🚩 Deprecations 🚩
Expand All @@ -26,6 +25,7 @@
- `metricstransformprocessor`: Migrate the processor from OC to pdata (#10817)
- This behavior can be reverted by disabling the `processor.metricstransformprocessor.UseOTLPDataModel` feature gate.
- `transformprocessor`: Add byte slice literal to the grammar. Add new SpanID and TraceID functions that take a byte slice and return a Span/Trace ID. (#10487)
- `transformprocessor`: Add Summary transform functions. (#11041)
- `transformprocessor`: Add nil literal to the grammar. (#11150)
- `elasticsearchreceiver`: Add integration test for elasticsearch receiver (#10165)
- `tailsamplingprocessor`: New sampler added that allows to sample based on minimum number of spans
Expand Down
9 changes: 8 additions & 1 deletion processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ Metric only functions:
- `convert_sum_to_gauge()` - Converts incoming metrics of type "Sum" to type "Gauge", retaining the metric's datapoints. Noop for metrics that are not of type "Sum".
**NOTE:** This function may cause a metric to break semantics for [Gauge metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#gauge). Use at your own risk.

- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge".
- `convert_summary_count_val_to_sum(aggregation_temporality, is_monotonic)` - Creates a new Sum metric out of incoming metrics of type "Summary" with a "Count" Value. Noop for metrics that are not of type "Summary". The name for the new metric with be `<summary metric>_count`. The fields that are copied are: `timestamp`, `starttimestamp`, `attibutes`, and `description`.
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#sums). Use at your own risk.

- `convert_summary_sum_val_to_sum(aggregation_temporality, is_monotonic)` - Creates a new Sum metric out of incoming metrics of type "Summary" with a "Sum" Value. Noop for metrics that are not of type "Summary". The name for the new metric with be `<summary metric>_sum`. The fields that are copied are: `timestamp`, `starttimestamp`, `attibutes`, and `description`. The new metric that is created will be passed to all functions in the metrics queries list. Function conditions will apply.
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#sums). Use at your own risk.


- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge". The new metric that is created will be passed to all functions in the metrics queries list. Function conditions will apply.
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#sums). Use at your own risk.

Supported where operations:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func convertSummaryCountValToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSummary {
return nil
}

sumMetric := mtc.GetMetrics().AppendEmpty()
sumMetric.SetDescription(metric.Description())
sumMetric.SetName(metric.Name() + "_count")
sumMetric.SetUnit(metric.Unit())
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(aggTemp)
sumMetric.Sum().SetIsMonotonic(monotonic)

sumDps := sumMetric.Sum().DataPoints()
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
sumDp := sumDps.AppendEmpty()
dp.Attributes().CopyTo(sumDp.Attributes())
sumDp.SetIntVal(int64(dp.Count()))
sumDp.SetStartTimestamp(dp.StartTimestamp())
sumDp.SetTimestamp(dp.Timestamp())
}
return nil
}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common/testhelper"
)

func getTestSummaryMetric() pmetric.Metric {
metricInput := pmetric.NewMetric()
metricInput.SetDataType(pmetric.MetricDataTypeSummary)
metricInput.SetName("summary_metric")
input := metricInput.Summary().DataPoints().AppendEmpty()
input.SetCount(100)
input.SetSum(12.34)

qVal1 := input.QuantileValues().AppendEmpty()
qVal1.SetValue(1)
qVal1.SetQuantile(.99)

qVal2 := input.QuantileValues().AppendEmpty()
qVal2.SetValue(2)
qVal2.SetQuantile(.95)

qVal3 := input.QuantileValues().AppendEmpty()
qVal3.SetValue(3)
qVal3.SetQuantile(.50)

attrs := getTestAttributes()
attrs.CopyTo(input.Attributes())
return metricInput
}

func getTestGaugeMetric() pmetric.Metric {
metricInput := pmetric.NewMetric()
metricInput.SetDataType(pmetric.MetricDataTypeGauge)
metricInput.SetName("gauge_metric")
input := metricInput.Gauge().DataPoints().AppendEmpty()
input.SetIntVal(12)

attrs := getTestAttributes()
attrs.CopyTo(input.Attributes())
return metricInput
}

func getTestAttributes() pcommon.Map {
attrs := pcommon.NewMap()
attrs.InsertString("test", "hello world")
attrs.InsertInt("test2", 3)
attrs.InsertBool("test3", true)
return attrs
}

func summaryTest(tests []summaryTestCase, t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetrics := pmetric.NewMetricSlice()
tt.input.CopyTo(actualMetrics.AppendEmpty())

evaluate, err := common.NewFunctionCall(tt.inv, DefaultFunctions(), ParsePath)
assert.NoError(t, err)
evaluate(metricTransformContext{
il: pcommon.NewInstrumentationScope(),
resource: pcommon.NewResource(),
metric: tt.input,
metrics: actualMetrics,
})

expected := pmetric.NewMetricSlice()
tt.want(expected)
assert.Equal(t, expected, actualMetrics)
})
}
}

type summaryTestCase struct {
name string
input pmetric.Metric
inv common.Invocation
want func(pmetric.MetricSlice)
}

func Test_ConvertSummarySumValToSum(t *testing.T) {
tests := []summaryTestCase{
{
name: "convert_summary_sum_val_to_sum",
input: getTestSummaryMetric(),
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
sumMetric.Sum().SetIsMonotonic(false)

sumMetric.SetName("summary_metric_sum")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(12.34)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "convert_summary_sum_val_to_sum (monotonic)",
input: getTestSummaryMetric(),
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(true)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
sumMetric.Sum().SetIsMonotonic(true)

sumMetric.SetName("summary_metric_sum")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(12.34)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "convert_summary_sum_val_to_sum (cumulative)",
input: getTestSummaryMetric(),
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("cumulative"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
sumMetric.Sum().SetIsMonotonic(false)

sumMetric.SetName("summary_metric_sum")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(12.34)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "convert_summary_sum_val_to_sum (no op)",
input: getTestGaugeMetric(),
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
gaugeMetric := getTestGaugeMetric()
gaugeMetric.CopyTo(metrics.AppendEmpty())
},
},
}
summaryTest(tests, t)
}

func Test_ConvertSummarySumValToSum_validation(t *testing.T) {
tests := []struct {
name string
stringAggTemp string
}{
{
name: "invalid aggregation temporality",
stringAggTemp: "not a real aggregation temporality",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := convertSummarySumValToSum(tt.stringAggTemp, true)
assert.Error(t, err, "unknown aggregation temporality: not a real aggregation temporality")
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func convertSummarySumValToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSummary {
return nil
}

sumMetric := mtc.GetMetrics().AppendEmpty()
sumMetric.SetDescription(metric.Description())
sumMetric.SetName(metric.Name() + "_sum")
sumMetric.SetUnit(metric.Unit())
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(aggTemp)
sumMetric.Sum().SetIsMonotonic(monotonic)

sumDps := sumMetric.Sum().DataPoints()
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
sumDp := sumDps.AppendEmpty()
dp.Attributes().CopyTo(sumDp.Attributes())
sumDp.SetDoubleVal(dp.Sum())
sumDp.SetStartTimestamp(dp.StartTimestamp())
sumDp.SetTimestamp(dp.Timestamp())
}
return nil
}, nil
}
Loading

0 comments on commit b7d9045

Please sign in to comment.