From 4bf8ac9aecf371c8a94284216e349dcbd7decf47 Mon Sep 17 00:00:00 2001 From: odubajDT <93584209+odubajDT@users.noreply.github.com> Date: Fri, 6 Sep 2024 19:20:49 +0200 Subject: [PATCH] [processor/transform] introduce aggregate_on_attribute_value function for metrics (#33423) **Link to tracking Issue:** #16224 **Changes:** - implemented `aggregate_on_attribute_value` function - tests - documentation **Depends on** https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/33669 --------- Signed-off-by: odubajDT --- .../feat_16224_aggregate_label_value.yaml | 27 + processor/transformprocessor/README.md | 49 +- ...unc_agregate_on_attribute_value_metrics.go | 70 ++ ...gregate_on_attribute_value_metrics_test.go | 661 ++++++++++++++++++ .../internal/metrics/functions.go | 1 + .../internal/metrics/functions_test.go | 1 + .../internal/metrics/processor_test.go | 47 ++ 7 files changed, 855 insertions(+), 1 deletion(-) create mode 100644 .chloggen/feat_16224_aggregate_label_value.yaml create mode 100644 processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go create mode 100644 processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go diff --git a/.chloggen/feat_16224_aggregate_label_value.yaml b/.chloggen/feat_16224_aggregate_label_value.yaml new file mode 100644 index 000000000000..c6c69f8b42ed --- /dev/null +++ b/.chloggen/feat_16224_aggregate_label_value.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Support aggregating metrics based on their attribute values and substituting the values with a new value." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [16224] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index abed1de6ac36..57441da0b189 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -220,6 +220,7 @@ In addition to OTTL functions, the processor defines its own functions to help w - [copy_metric](#copy_metric) - [scale_metric](#scale_metric) - [aggregate_on_attributes](#aggregate_on_attributes) +- [aggregate_on_attribute_value](#aggregate_on_attribute_value) ### convert_sum_to_gauge @@ -374,10 +375,12 @@ Examples: `aggregate_on_attributes(function, Optional[attributes])` -The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys to aggregate upon. +The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys of type string to aggregate upon. `aggregate_on_attributes` function removes all attributes that are present in datapoints except the ones that are specified in the `attributes` parameter. If `attributes` parameter is not set, all attributes are removed from datapoints. Afterwards all datapoints are aggregated depending on the attributes left (none or the ones present in the list). +**NOTE:** This function is supported only in `metric` context. + The following metric types can be aggregated: - sum @@ -415,6 +418,50 @@ statements: To aggregate only using a specified set of attributes, you can use `keep_matching_keys`. +### aggregate_on_attribute_value + +`aggregate_on_attribute_value(function, attribute, values, newValue)` + +The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` (type string) with one of the values present in the `values` parameter (list of strings) into a single datapoint where the attribute has the value `newValue` (type string). `function` is a case-sensitive string that represents the aggregation function. + +**NOTE:** This function is supported only in `metric` context. + +The following metric types can be aggregated: + +- sum +- gauge +- histogram +- exponential histogram + +Supported aggregation functions are: + +- sum +- max +- min +- mean +- median +- count + +**NOTE:** Only the `sum` agregation function is supported for histogram and exponential histogram datatypes. + +Examples: + +- `aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"` + +The `aggregate_on_attribute_value` function can also be used in conjunction with +[keep_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#keep_matching_keys) or +[delete_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#delete_matching_keys). + +For example, to remove attribute keys matching a regex and aggregate the metrics on the remaining attributes, you can perform the following statement sequence: + +```yaml +statements: + - delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage" + - aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage" +``` + +To aggregate only using a specified set of attributes, you can use `keep_matching_keys`. + ## Examples ### Perform transformation if field does not exist diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go new file mode 100644 index 000000000000..0c6b5fd12961 --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +type aggregateOnAttributeValueArguments struct { + AggregationFunction string + Attribute string + Values []string + NewValue string +} + +func newAggregateOnAttributeValueFactory() ottl.Factory[ottlmetric.TransformContext] { + return ottl.NewFactory("aggregate_on_attribute_value", &aggregateOnAttributeValueArguments{}, createAggregateOnAttributeValueFunction) +} + +func createAggregateOnAttributeValueFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + args, ok := oArgs.(*aggregateOnAttributeValueArguments) + + if !ok { + return nil, fmt.Errorf("AggregateOnAttributeValueFactory args must be of type *AggregateOnAttributeValueArguments") + } + + t, err := aggregateutil.ConvertToAggregationFunction(args.AggregationFunction) + if err != nil { + return nil, fmt.Errorf("invalid aggregation function: '%s', valid options: %s", err.Error(), aggregateutil.GetSupportedAggregationFunctionsList()) + } + + return AggregateOnAttributeValue(t, args.Attribute, args.Values, args.NewValue) +} + +func AggregateOnAttributeValue(aggregationType aggregateutil.AggregationType, attribute string, values []string, newValue string) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + return func(_ context.Context, tCtx ottlmetric.TransformContext) (any, error) { + metric := tCtx.GetMetric() + + aggregateutil.RangeDataPointAttributes(metric, func(attrs pcommon.Map) bool { + val, ok := attrs.Get(attribute) + if !ok { + return true + } + + for _, v := range values { + if val.Str() == v { + val.SetStr(newValue) + } + } + return true + }) + ag := aggregateutil.AggGroups{} + newMetric := pmetric.NewMetric() + aggregateutil.CopyMetricDetails(metric, newMetric) + aggregateutil.GroupDataPoints(metric, &ag) + aggregateutil.MergeDataPoints(newMetric, aggregationType, ag) + newMetric.MoveTo(metric) + + return nil, nil + }, nil +} diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go new file mode 100644 index 000000000000..0f84ee276d12 --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go @@ -0,0 +1,661 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" +) + +func Test_aggregateOnAttributeValues(t *testing.T) { + tests := []struct { + name string + input pmetric.Metric + t aggregateutil.AggregationType + attribute string + values []string + newValue string + want func(pmetric.MetricSlice) + }{ + { + name: "non-existing value", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test44", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + }, + }, + { + name: "empty values", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{}, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + }, + }, + { + name: "non-existing attribute", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + }, + attribute: "testyy", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + }, + }, + { + name: "non-matching attribute", + input: getTestSumMetricMultipleAggregateOnAttributeValueAdditionalAttribute(), + t: aggregateutil.Sum, + values: []string{ + "test1", + }, + attribute: "testyy", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + input2.Attributes().PutStr("test3", "test3") + }, + }, + { + name: "duplicated values", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "2 datapoints aggregated, one left unaggregated", + input: getTestSumMetricMultipleAggregateOnAttributeValueOdd(), + t: aggregateutil.Sum, + values: []string{ + "test1", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + input.Attributes().PutStr("test", "test_new") + + input3 := sumMetric.Sum().DataPoints().AppendEmpty() + input3.SetDoubleValue(30) + input3.Attributes().PutStr("test", "test2") + }, + }, + { + name: "sum sum", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum mean", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Mean, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(75) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum max", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Max, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum min", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Min, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(50) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum count", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Count, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(2) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum median even", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Median, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(75) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum median odd", + input: getTestSumMetricMultipleAggregateOnAttributeValueOdd(), + t: aggregateutil.Median, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(50) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge sum", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(17) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge mean", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Mean, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(8) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge count", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Count, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(2) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge median even", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Median, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(8) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge median odd", + input: getTestGaugeMetricMultipleAggregateOnAttributeValueOdd(), + t: aggregateutil.Median, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(5) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge min", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Min, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(5) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge max", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Max, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "histogram", + input: getTestHistogramMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyHistogram() + metricInput.SetName("histogram_metric") + metricInput.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.Histogram().DataPoints().AppendEmpty() + input.SetCount(10) + input.SetSum(25) + + input.BucketCounts().Append(4, 6) + input.ExplicitBounds().Append(1) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "exponential histogram", + input: getTestExponentialHistogramMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyExponentialHistogram() + metricInput.SetName("exponential_histogram_metric") + metricInput.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input.SetScale(1) + input.SetCount(10) + input.SetSum(25) + input.Attributes().PutStr("test", "test_new") + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + evaluate, err := AggregateOnAttributeValue(tt.t, tt.attribute, tt.values, tt.newValue) + assert.NoError(t, err) + + _, err = evaluate(nil, ottlmetric.NewTransformContext(tt.input, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics())) + require.NoError(t, err) + + actualMetric := pmetric.NewMetricSlice() + tt.input.CopyTo(actualMetric.AppendEmpty()) + + if tt.want != nil { + expected := pmetric.NewMetricSlice() + tt.want(expected) + + expectedMetrics := pmetric.NewMetrics() + sl := expectedMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + expected.CopyTo(sl) + + actualMetrics := pmetric.NewMetrics() + sl2 := actualMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + actualMetric.CopyTo(sl2) + + require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreMetricDataPointsOrder())) + } + }) + } +} + +func Test_createAggregateOnAttributeValueFunction(t *testing.T) { + // invalid input arguments + _, e := createAggregateOnAttributeValueFunction(ottl.FunctionContext{}, nil) + require.Contains(t, e.Error(), "AggregateOnAttributeValueFactory args must be of type *AggregateOnAttributeValueArguments") + + // invalid aggregation function + _, e = createAggregateOnAttributeValueFunction(ottl.FunctionContext{}, &aggregateOnAttributeValueArguments{ + AggregationFunction: "invalid", + Attribute: "attr", + Values: []string{"val"}, + NewValue: "newVal", + }) + require.Contains(t, e.Error(), "invalid aggregation function") +} + +func getTestSumMetricMultipleAggregateOnAttributeValue() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + + return metricInput +} + +func getTestSumMetricMultipleAggregateOnAttributeValueAdditionalAttribute() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + input2.Attributes().PutStr("test3", "test3") + + return metricInput +} + +func getTestSumMetricMultipleAggregateOnAttributeValueOdd() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test1") + + input3 := metricInput.Sum().DataPoints().AppendEmpty() + input3.SetDoubleValue(30) + input3.Attributes().PutStr("test", "test2") + + return metricInput +} + +func getTestGaugeMetricMultipleAggregateOnAttributeValue() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Gauge().DataPoints().AppendEmpty() + input2.SetIntValue(5) + input2.Attributes().PutStr("test", "test2") + + return metricInput +} + +func getTestGaugeMetricMultipleAggregateOnAttributeValueOdd() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Gauge().DataPoints().AppendEmpty() + input2.SetIntValue(5) + input2.Attributes().PutStr("test", "test2") + + input3 := metricInput.Gauge().DataPoints().AppendEmpty() + input3.SetIntValue(3) + input3.Attributes().PutStr("test", "test1") + + return metricInput +} + +func getTestHistogramMetricMultipleAggregateOnAttributeValue() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyHistogram() + metricInput.SetName("histogram_metric") + metricInput.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.Histogram().DataPoints().AppendEmpty() + input.SetCount(5) + input.SetSum(12.34) + + input.BucketCounts().Append(2, 3) + input.ExplicitBounds().Append(1) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Histogram().DataPoints().AppendEmpty() + input2.SetCount(5) + input2.SetSum(12.66) + + input2.BucketCounts().Append(2, 3) + input2.ExplicitBounds().Append(1) + input2.Attributes().PutStr("test", "test2") + return metricInput +} + +func getTestExponentialHistogramMetricMultipleAggregateOnAttributeValue() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyExponentialHistogram() + metricInput.SetName("exponential_histogram_metric") + metricInput.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input.SetScale(1) + input.SetCount(5) + input.SetSum(12.34) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input2.SetScale(1) + input2.SetCount(5) + input2.SetSum(12.66) + input2.Attributes().PutStr("test", "test2") + return metricInput +} diff --git a/processor/transformprocessor/internal/metrics/functions.go b/processor/transformprocessor/internal/metrics/functions.go index e9e58b08a212..e9a4462f0a5e 100644 --- a/processor/transformprocessor/internal/metrics/functions.go +++ b/processor/transformprocessor/internal/metrics/functions.go @@ -51,6 +51,7 @@ func MetricFunctions() map[string]ottl.Factory[ottlmetric.TransformContext] { newCopyMetricFactory(), newScaleMetricFactory(), newAggregateOnAttributesFactory(), + newAggregateOnAttributeValueFactory(), ) if UseConvertBetweenSumAndGaugeMetricContext.IsEnabled() { diff --git a/processor/transformprocessor/internal/metrics/functions_test.go b/processor/transformprocessor/internal/metrics/functions_test.go index 98a08e18652f..a54c9274d1a0 100644 --- a/processor/transformprocessor/internal/metrics/functions_test.go +++ b/processor/transformprocessor/internal/metrics/functions_test.go @@ -62,6 +62,7 @@ func Test_MetricFunctions(t *testing.T) { expected["convert_sum_to_gauge"] = newConvertSumToGaugeFactory() expected["convert_gauge_to_sum"] = newConvertGaugeToSumFactory() expected["aggregate_on_attributes"] = newAggregateOnAttributesFactory() + expected["aggregate_on_attribute_value"] = newAggregateOnAttributeValueFactory() expected["extract_sum_metric"] = newExtractSumMetricFactory() expected["extract_count_metric"] = newExtractCountMetricFactory() expected["copy_metric"] = newCopyMetricFactory() diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 81140cd6babf..6087fcd70d74 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -245,6 +245,20 @@ func Test_ProcessMetrics_MetricContext(t *testing.T) { dataPoint1.Attributes().PutStr("flags", "A|B|C") dataPoint1.Attributes().PutStr("total.string", "123456789") + dataPoints.CopyTo(m.Sum().DataPoints()) + }, + }, + { + statements: []string{`aggregate_on_attribute_value("sum", "attr1", ["test1", "test2"], "test") where name == "operationE"`}, + want: func(td pmetric.Metrics) { + m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4) + + dataPoints := pmetric.NewNumberDataPointSlice() + dataPoint1 := dataPoints.AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(4.7) + dataPoint1.Attributes().PutStr("attr1", "test") + dataPoints.CopyTo(m.Sum().DataPoints()) }, }, @@ -290,6 +304,8 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, { @@ -322,6 +338,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetDescription("test") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetDescription("test") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetDescription("test") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetDescription("test") }, }, { @@ -331,12 +348,14 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetUnit("new unit") }, }, { statements: []string{`set(metric.description, "Sum") where metric.type == METRIC_DATA_TYPE_SUM`}, want: func(td pmetric.Metrics) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetDescription("Sum") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetDescription("Sum") }, }, { @@ -345,12 +364,14 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) }, }, { statements: []string{`set(metric.is_monotonic, true) where metric.is_monotonic == false`}, want: func(td pmetric.Metrics) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().SetIsMonotonic(true) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().SetIsMonotonic(true) }, }, { @@ -394,6 +415,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "pass") }, }, { @@ -406,6 +428,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "pass") }, }, { @@ -527,6 +550,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetUnit("new unit") }, }, { @@ -731,6 +755,8 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, { @@ -758,6 +784,8 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, { @@ -811,6 +839,8 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, } @@ -879,6 +909,7 @@ func constructMetrics() pmetric.Metrics { fillMetricTwo(rm0ils0.Metrics().AppendEmpty()) fillMetricThree(rm0ils0.Metrics().AppendEmpty()) fillMetricFour(rm0ils0.Metrics().AppendEmpty()) + fillMetricFive(rm0ils0.Metrics().AppendEmpty()) return td } @@ -978,3 +1009,19 @@ func fillMetricFour(m pmetric.Metric) { quantileDataPoint1.SetQuantile(.95) quantileDataPoint1.SetValue(321) } + +func fillMetricFive(m pmetric.Metric) { + m.SetName("operationE") + m.SetDescription("operationE description") + m.SetUnit("operationE unit") + + dataPoint0 := m.SetEmptySum().DataPoints().AppendEmpty() + dataPoint0.SetStartTimestamp(StartTimestamp) + dataPoint0.SetDoubleValue(1.0) + dataPoint0.Attributes().PutStr("attr1", "test1") + + dataPoint1 := m.Sum().DataPoints().AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(3.7) + dataPoint1.Attributes().PutStr("attr1", "test2") +}