From 8f09079f8186881f540479309fce99bec23c71db Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 1 Jun 2022 10:37:24 -0400 Subject: [PATCH] [processor/transform] Add functions for conversion of scalar metric types (#10255) * add conversion functions for scalar metrics * fix failing factory test * lint * revert EqualsValue * update registry comment * changelog * move Bool rule down under string * Rename metric only functions header Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> * add example in readme * capitialize Sum * Add disclaimer to README that functions may break semantics * use new testhelper package for test pointers from literals * move changelog entry to unreleased Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> --- CHANGELOG.md | 1 + processor/transformprocessor/README.md | 11 + processor/transformprocessor/config_test.go | 3 +- processor/transformprocessor/factory_test.go | 3 +- .../internal/common/functions.go | 5 + .../internal/common/parser.go | 10 + .../internal/common/parser_test.go | 34 ++++ .../internal/common/testhelper/testhelper.go | 4 + .../internal/metrics/functions.go | 77 ++++++- .../internal/metrics/functions_test.go | 190 ++++++++++++++++++ 10 files changed, 334 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 003e2acaec30..fe589cd7c5b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - `tailsamplingprocessor`: Add support for string invert matching to `and` policy (#9553) - `mezemoexporter`: Add user agent string to outgoing HTTP requests (#10470) +- `transformprocessor`: Add functions for conversion of scalar metric types (`gauge_to_sum` and `sum_to_gauge`) (#10255) ### 🧰 Bug fixes 🧰 diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 5e4bff0cc68d..9be2f27eee96 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -37,6 +37,13 @@ the fields specified by the list of strings. e.g., `keep_keys(attributes, "http. - `replace_all_matches(target, pattern, replacement)` - `target` is a path expression to a map type field, `pattern` is a string following [filepath.Match syntax](https://pkg.go.dev/path/filepath#Match), and `replacement` is a string. Each string value in `target` that matches `pattern` will get replaced with `replacement`. e.g., `replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")` +Metric only functions: +- `convert_sum_to_gauge()` - Converts incoming metrics of type "Sum" to type "Gauge", retaining the metric's datapoints. Noop for metrics that are not of type "Sum". +**NOTE:** This function may cause a metric to break semantics for [Gauge metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#gauge). Use at your own risk. + +- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge". +**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#sums). Use at your own risk. + Supported where operations: - `==` - matches telemetry where the values are equal to each other - `!=` - matches telemetry where the values are not equal to each other @@ -70,6 +77,8 @@ processors: - limit(attributes, 100) - truncate_all(attributes, 4096) - truncate_all(resource.attributes, 4096) + - convert_sum_to_gauge() where metric.name == "system.processes.count" + - convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric" logs: queries: - set(severity_text, "FAIL") where body == "request failed" @@ -108,6 +117,8 @@ All metrics and their data points 4) Limit all data point attributes such that each data point has no more than 100 attributes. 6) Truncate all data point attributes such that no string value has more than 4096 characters. 7) Truncate all resource attributes such that no string value has more than 4096 characters. +8) Convert all metrics with name `system.processes.count` from a Sum to Gauge. +9) Convert all metrics with name `prometheus_metric` from Gauge to a cumulative, non-monotonic Sum. All logs diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index 5d15e19f9506..635fbb50abd4 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/service/servicetest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs" @@ -56,7 +57,7 @@ func TestLoadingConfig(t *testing.T) { `keep_keys(attributes, "http.method", "http.path")`, }, - functions: traces.DefaultFunctions(), + functions: metrics.DefaultFunctions(), }, Logs: SignalConfig{ Queries: []string{ diff --git a/processor/transformprocessor/factory_test.go b/processor/transformprocessor/factory_test.go index ddad9634bd2b..b1958926ff9a 100644 --- a/processor/transformprocessor/factory_test.go +++ b/processor/transformprocessor/factory_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs" @@ -50,7 +51,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) { Metrics: SignalConfig{ Queries: []string{}, - functions: traces.DefaultFunctions(), + functions: metrics.DefaultFunctions(), }, Logs: SignalConfig{ Queries: []string{}, diff --git a/processor/transformprocessor/internal/common/functions.go b/processor/transformprocessor/internal/common/functions.go index 3b5ae2267a0e..31022bcebada 100644 --- a/processor/transformprocessor/internal/common/functions.go +++ b/processor/transformprocessor/internal/common/functions.go @@ -247,6 +247,11 @@ func NewFunctionCall(inv Invocation, functions map[string]interface{}, pathParse return nil, fmt.Errorf("invalid argument at position %v, must be a string", i) } args = append(args, reflect.ValueOf(*argDef.String)) + case "bool": + if argDef.Bool == nil { + return nil, fmt.Errorf("invalid argument at position %v, must be a bool", i) + } + args = append(args, reflect.ValueOf(bool(*argDef.Bool))) } } val := reflect.ValueOf(f) diff --git a/processor/transformprocessor/internal/common/parser.go b/processor/transformprocessor/internal/common/parser.go index 95e6cf9bcbd7..7bc3ed109f26 100644 --- a/processor/transformprocessor/internal/common/parser.go +++ b/processor/transformprocessor/internal/common/parser.go @@ -20,6 +20,15 @@ import ( "go.uber.org/multierr" ) +// Type for capturing booleans, see: +// https://github.com/alecthomas/participle#capturing-boolean-value +type Boolean bool + +func (b *Boolean) Capture(values []string) error { + *b = values[0] == "true" + return nil +} + // ParsedQuery represents a parsed query. It is the entry point into the query DSL. // nolint:govet type ParsedQuery struct { @@ -50,6 +59,7 @@ type Value struct { String *string `| @String` Float *float64 `| @Float` Int *int64 `| @Int` + Bool *Boolean `| @("true" | "false")` Path *Path `| @@ )` } diff --git a/processor/transformprocessor/internal/common/parser_test.go b/processor/transformprocessor/internal/common/parser_test.go index d29be9af17ff..cd3000e26631 100644 --- a/processor/transformprocessor/internal/common/parser_test.go +++ b/processor/transformprocessor/internal/common/parser_test.go @@ -278,6 +278,40 @@ func Test_parse(t *testing.T) { Condition: nil, }, }, + { + query: `convert_gauge_to_sum("cumulative", false)`, + expected: &ParsedQuery{ + Invocation: Invocation{ + Function: "convert_gauge_to_sum", + Arguments: []Value{ + { + String: testhelper.Strp("cumulative"), + }, + { + Bool: (*Boolean)(testhelper.Boolp(false)), + }, + }, + }, + Condition: nil, + }, + }, + { + query: `convert_gauge_to_sum("cumulative", true)`, + expected: &ParsedQuery{ + Invocation: Invocation{ + Function: "convert_gauge_to_sum", + Arguments: []Value{ + { + String: testhelper.Strp("cumulative"), + }, + { + Bool: (*Boolean)(testhelper.Boolp(true)), + }, + }, + }, + Condition: nil, + }, + }, } for _, tt := range tests { diff --git a/processor/transformprocessor/internal/common/testhelper/testhelper.go b/processor/transformprocessor/internal/common/testhelper/testhelper.go index 154034c32ac8..92b1dd071703 100644 --- a/processor/transformprocessor/internal/common/testhelper/testhelper.go +++ b/processor/transformprocessor/internal/common/testhelper/testhelper.go @@ -25,3 +25,7 @@ func Floatp(f float64) *float64 { func Intp(i int64) *int64 { return &i } + +func Boolp(b bool) *bool { + return &b +} diff --git a/processor/transformprocessor/internal/metrics/functions.go b/processor/transformprocessor/internal/metrics/functions.go index 736bb47b86be..10c10b86252c 100644 --- a/processor/transformprocessor/internal/metrics/functions.go +++ b/processor/transformprocessor/internal/metrics/functions.go @@ -15,10 +15,83 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pmetric" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +// registry is a map of names to functions for metrics pipelines +var registry = map[string]interface{}{ + "convert_sum_to_gauge": convertSumToGauge, + "convert_gauge_to_sum": convertGaugeToSum, +} + +func init() { + // Init metrics registry with default functions common to all signals + for k, v := range common.DefaultFunctions() { + registry[k] = v + } +} + func DefaultFunctions() map[string]interface{} { - // No metric-only functions yet. - return common.DefaultFunctions() + return registry +} + +func convertSumToGauge() (common.ExprFunc, error) { + return func(ctx common.TransformContext) interface{} { + mtc, ok := ctx.(metricTransformContext) + if !ok { + return nil + } + + metric := mtc.GetMetric() + if metric.DataType() != pmetric.MetricDataTypeSum { + return nil + } + + dps := metric.Sum().DataPoints() + + metric.SetDataType(pmetric.MetricDataTypeGauge) + // Setting the data type removed all the data points, so we must copy them back to the metric. + dps.CopyTo(metric.Gauge().DataPoints()) + + return nil + }, nil +} + +func convertGaugeToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) { + var aggTemp pmetric.MetricAggregationTemporality + switch stringAggTemp { + case "delta": + aggTemp = pmetric.MetricAggregationTemporalityDelta + case "cumulative": + aggTemp = pmetric.MetricAggregationTemporalityCumulative + default: + return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp) + } + + return func(ctx common.TransformContext) interface{} { + mtc, ok := ctx.(metricTransformContext) + if !ok { + return nil + } + + metric := mtc.GetMetric() + if metric.DataType() != pmetric.MetricDataTypeGauge { + return nil + } + + dps := metric.Gauge().DataPoints() + + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.Sum().SetAggregationTemporality(aggTemp) + metric.Sum().SetIsMonotonic(monotonic) + + // Setting the data type removed all the data points, so we must copy them back to the metric. + dps.CopyTo(metric.Sum().DataPoints()) + + return nil + }, nil } diff --git a/processor/transformprocessor/internal/metrics/functions_test.go b/processor/transformprocessor/internal/metrics/functions_test.go index 67ac25a8d6f6..05b41203decb 100644 --- a/processor/transformprocessor/internal/metrics/functions_test.go +++ b/processor/transformprocessor/internal/metrics/functions_test.go @@ -26,6 +26,33 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common/testhelper" ) +func Test_newFunctionCall_invalid(t *testing.T) { + tests := []struct { + name string + inv common.Invocation + }{ + { + name: "invalid aggregation temporality", + inv: common.Invocation{ + Function: "convert_gauge_to_sum", + Arguments: []common.Value{ + { + String: testhelper.Strp("invalid_agg_temp"), + }, + { + Bool: (*common.Boolean)(testhelper.Boolp(true)), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := common.NewFunctionCall(tt.inv, DefaultFunctions(), ParsePath) + assert.Error(t, err) + }) + } +} func Test_newFunctionCall_NumberDataPoint(t *testing.T) { input := pmetric.NewNumberDataPoint() attrs := pcommon.NewMap() @@ -1678,3 +1705,166 @@ func Test_newFunctionCall_Metric(t *testing.T) { }) } } + +func Test_newFunctionCall_Metric_Sum(t *testing.T) { + input := pmetric.NewMetric() + input.SetDataType(pmetric.MetricDataTypeSum) + + dp1 := input.Sum().DataPoints().AppendEmpty() + dp1.SetIntVal(10) + + dp2 := input.Sum().DataPoints().AppendEmpty() + dp2.SetDoubleVal(14.5) + + tests := []struct { + name string + inv common.Invocation + want func(pmetric.Metric) + }{ + { + name: "convert sum to gauge", + inv: common.Invocation{ + Function: "convert_sum_to_gauge", + Arguments: []common.Value{}, + }, + want: func(metric pmetric.Metric) { + input.CopyTo(metric) + + dps := input.Sum().DataPoints() + metric.SetDataType(pmetric.MetricDataTypeGauge) + dps.CopyTo(metric.Gauge().DataPoints()) + }, + }, + { + name: "convert gauge to sum (noop)", + inv: common.Invocation{ + Function: "convert_gauge_to_sum", + Arguments: []common.Value{ + { + String: testhelper.Strp("delta"), + }, + { + Bool: (*common.Boolean)(testhelper.Boolp(false)), + }, + }, + }, + want: func(metric pmetric.Metric) { + input.CopyTo(metric) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := pmetric.NewMetric() + input.CopyTo(metric) + + evaluate, err := common.NewFunctionCall(tt.inv, DefaultFunctions(), ParsePath) + assert.NoError(t, err) + evaluate(metricTransformContext{ + metric: metric, + il: pcommon.NewInstrumentationScope(), + resource: pcommon.NewResource(), + }) + + expected := pmetric.NewMetric() + tt.want(expected) + assert.Equal(t, expected, metric) + }) + } +} + +func Test_newFunctionCall_Metric_Gauge(t *testing.T) { + input := pmetric.NewMetric() + input.SetDataType(pmetric.MetricDataTypeGauge) + + dp1 := input.Gauge().DataPoints().AppendEmpty() + dp1.SetIntVal(10) + + dp2 := input.Gauge().DataPoints().AppendEmpty() + dp2.SetDoubleVal(14.5) + + tests := []struct { + name string + inv common.Invocation + want func(pmetric.Metric) + }{ + { + name: "convert gauge to sum 1", + inv: common.Invocation{ + Function: "convert_gauge_to_sum", + Arguments: []common.Value{ + { + String: testhelper.Strp("cumulative"), + }, + { + Bool: (*common.Boolean)(testhelper.Boolp(false)), + }, + }, + }, + want: func(metric pmetric.Metric) { + input.CopyTo(metric) + + dps := input.Gauge().DataPoints() + + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) + metric.Sum().SetIsMonotonic(false) + + dps.CopyTo(metric.Sum().DataPoints()) + }, + }, + { + name: "convert gauge to sum 2", + inv: common.Invocation{ + Function: "convert_gauge_to_sum", + Arguments: []common.Value{ + { + String: testhelper.Strp("delta"), + }, + { + Bool: (*common.Boolean)(testhelper.Boolp(true)), + }, + }, + }, + want: func(metric pmetric.Metric) { + input.CopyTo(metric) + + dps := input.Gauge().DataPoints() + + metric.SetDataType(pmetric.MetricDataTypeSum) + metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + metric.Sum().SetIsMonotonic(true) + + dps.CopyTo(metric.Sum().DataPoints()) + }, + }, + { + name: "convert sum to gauge (no-op)", + inv: common.Invocation{ + Function: "convert_sum_to_gauge", + Arguments: []common.Value{}, + }, + want: func(metric pmetric.Metric) { + input.CopyTo(metric) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := pmetric.NewMetric() + input.CopyTo(metric) + + evaluate, err := common.NewFunctionCall(tt.inv, DefaultFunctions(), ParsePath) + assert.NoError(t, err) + evaluate(metricTransformContext{ + metric: metric, + il: pcommon.NewInstrumentationScope(), + resource: pcommon.NewResource(), + }) + + expected := pmetric.NewMetric() + tt.want(expected) + assert.Equal(t, expected, metric) + }) + } +}