diff --git a/.chloggen/aggregation-metricstransformprocessor.yaml b/.chloggen/aggregation-metricstransformprocessor.yaml new file mode 100644 index 000000000000..e3ae4a72628e --- /dev/null +++ b/.chloggen/aggregation-metricstransformprocessor.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: metricstransformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adds the 'median' aggregation type to the Metrics Transform Processor. Also uses the refactored aggregation business logic from internal/core package." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [16224] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/metricstransformprocessor/README.md b/processor/metricstransformprocessor/README.md index 80f3c594a83c..2cbb3da3237f 100644 --- a/processor/metricstransformprocessor/README.md +++ b/processor/metricstransformprocessor/README.md @@ -86,7 +86,7 @@ processors: # new_name specifies the updated name of the metric; if action is insert or combine, new_name is required new_name: # aggregation_type defines how combined data points will be aggregated; if action is combine, aggregation_type is required - aggregation_type: {sum, mean, min, max, count} + aggregation_type: {sum, mean, min, max, count, median} # submatch_case specifies the case that should be used when adding label values based on regexp submatches when performing a combine action; leave blank to use the submatch value as is submatch_case: {lower, upper} # operations contain a list of operations that will be performed on the resulting metric(s) @@ -106,7 +106,7 @@ processors: # label_set contains a list of labels that will remain after aggregation; if action is aggregate_labels, label_set is required label_set: [labels...] # aggregation_type defines how data points will be aggregated; if action is aggregate_labels or aggregate_label_values, aggregation_type is required - aggregation_type: {sum, mean, min, max, count} + aggregation_type: {sum, mean, min, max, count, median} # experimental_scale specifies the scalar to apply to values experimental_scale: # value_actions contain a list of operations that will be performed on the selected label @@ -273,6 +273,8 @@ operations: aggregation_type: sum ``` +**NOTE:** Only the `sum` aggregation function is supported for histogram and exponential histogram datatypes. + ### Aggregate label values ```yaml # aggregate data points with state label value slab_reclaimable & slab_unreclaimable using summation into slab @@ -286,6 +288,8 @@ operations: aggregation_type: sum ``` +**NOTE:** Only the `sum` aggregation function is supported for histogram and exponential histogram datatypes. + ### Combine metrics ```yaml # convert a set of metrics for each http_method into a single metric with an http_method label, i.e. diff --git a/processor/metricstransformprocessor/config.go b/processor/metricstransformprocessor/config.go index 62587de732ce..ee84e0c33865 100644 --- a/processor/metricstransformprocessor/config.go +++ b/processor/metricstransformprocessor/config.go @@ -3,6 +3,8 @@ package metricstransformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor" +import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + const ( // includeFieldName is the mapstructure field name for Include field includeFieldName = "include" @@ -75,7 +77,7 @@ type transform struct { // AggregationType specifies how to aggregate. // REQUIRED only if Action is COMBINE. - AggregationType aggregationType `mapstructure:"aggregation_type"` + AggregationType aggregateutil.AggregationType `mapstructure:"aggregation_type"` // SubmatchCase specifies what case to use for label values created from regexp submatches. SubmatchCase submatchCase `mapstructure:"submatch_case"` @@ -112,7 +114,7 @@ type Operation struct { LabelSet []string `mapstructure:"label_set"` // AggregationType specifies how to aggregate. - AggregationType aggregationType `mapstructure:"aggregation_type"` + AggregationType aggregateutil.AggregationType `mapstructure:"aggregation_type"` // AggregatedValues is a list of label values to aggregate away. AggregatedValues []string `mapstructure:"aggregated_values"` @@ -216,38 +218,6 @@ func (oa operationAction) isValid() bool { return false } -// aggregationType is the enum to capture the three types of aggregation for the aggregation operation. -type aggregationType string - -const ( - // sum indicates taking the sum of the aggregated data. - sum aggregationType = "sum" - - // mean indicates taking the mean of the aggregated data. - mean aggregationType = "mean" - - // min indicates taking the minimum of the aggregated data. - min aggregationType = "min" - - // max indicates taking the max of the aggregated data. - max aggregationType = "max" - - // count indicates taking the count of the aggregated data. - count aggregationType = "count" -) - -var aggregationTypes = []aggregationType{sum, mean, min, max, count} - -func (at aggregationType) isValid() bool { - for _, aggregationType := range aggregationTypes { - if at == aggregationType { - return true - } - } - - return false -} - // matchType is the enum to capture the two types of matching metric(s) that should have operations applied to them. type matchType string diff --git a/processor/metricstransformprocessor/factory.go b/processor/metricstransformprocessor/factory.go index 84403df79a1d..318ae24845f4 100644 --- a/processor/metricstransformprocessor/factory.go +++ b/processor/metricstransformprocessor/factory.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor/internal/metadata" ) @@ -88,8 +89,8 @@ func validateConfiguration(config *Config) error { return fmt.Errorf("missing required field %q while %q is %v", groupResourceLabelsFieldName, actionFieldName, Group) } - if transform.AggregationType != "" && !transform.AggregationType.isValid() { - return fmt.Errorf("%q must be in %q", aggregationTypeFieldName, aggregationTypes) + if transform.AggregationType != "" && !transform.AggregationType.IsValid() { + return fmt.Errorf("%q must be in %q", aggregationTypeFieldName, aggregateutil.AggregationTypes) } if transform.SubmatchCase != "" && !transform.SubmatchCase.isValid() { @@ -114,8 +115,8 @@ func validateConfiguration(config *Config) error { return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, scaleFieldName, actionFieldName, scaleValue) } - if op.AggregationType != "" && !op.AggregationType.isValid() { - return fmt.Errorf("operation %v: %q must be in %q", i+1, aggregationTypeFieldName, aggregationTypes) + if op.AggregationType != "" && !op.AggregationType.IsValid() { + return fmt.Errorf("operation %v: %q must be in %q", i+1, aggregationTypeFieldName, aggregateutil.AggregationTypes) } } } diff --git a/processor/metricstransformprocessor/factory_test.go b/processor/metricstransformprocessor/factory_test.go index e61db4dad854..fddfb0984bf1 100644 --- a/processor/metricstransformprocessor/factory_test.go +++ b/processor/metricstransformprocessor/factory_test.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/processor/processortest" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor/internal/metadata" ) @@ -87,7 +88,7 @@ func TestCreateProcessors(t *testing.T) { { configName: "config_invalid_aggregationtype.yaml", succeed: false, - errorMessage: fmt.Sprintf("%q must be in %q", aggregationTypeFieldName, aggregationTypes), + errorMessage: fmt.Sprintf("%q must be in %q", aggregationTypeFieldName, aggregateutil.AggregationTypes), }, { configName: "config_invalid_operation_action.yaml", @@ -97,7 +98,7 @@ func TestCreateProcessors(t *testing.T) { { configName: "config_invalid_operation_aggregationtype.yaml", succeed: false, - errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, aggregationTypeFieldName, aggregationTypes), + errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, aggregationTypeFieldName, aggregateutil.AggregationTypes), }, { configName: "config_invalid_submatchcase.yaml", @@ -221,14 +222,14 @@ func TestCreateProcessorsFilledData(t *testing.T) { { Action: aggregateLabels, LabelSet: []string{"label1", "label2"}, - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, { Action: aggregateLabelValues, Label: "label", AggregatedValues: []string{"value1", "value2"}, NewValue: "new-value", - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, }, }, @@ -265,7 +266,7 @@ func TestCreateProcessorsFilledData(t *testing.T) { configOperation: Operation{ Action: aggregateLabels, LabelSet: []string{"label1", "label2"}, - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, labelSetMap: map[string]bool{ "label1": true, @@ -278,7 +279,7 @@ func TestCreateProcessorsFilledData(t *testing.T) { Label: "label", AggregatedValues: []string{"value1", "value2"}, NewValue: "new-value", - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, aggregatedValuesSet: map[string]bool{ "value1": true, diff --git a/processor/metricstransformprocessor/go.mod b/processor/metricstransformprocessor/go.mod index af0be06b05b7..6107d74dcee4 100644 --- a/processor/metricstransformprocessor/go.mod +++ b/processor/metricstransformprocessor/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/metri go 1.21.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.105.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.105.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.105.0 github.com/stretchr/testify v1.9.0 @@ -73,3 +74,5 @@ retract ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal diff --git a/processor/metricstransformprocessor/metrics_transform_processor.go b/processor/metricstransformprocessor/metrics_transform_processor.go index d39042340fff..61f51391f7ac 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor.go +++ b/processor/metricstransformprocessor/metrics_transform_processor.go @@ -10,6 +10,8 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" ) type metricsTransformProcessor struct { @@ -23,7 +25,7 @@ type internalTransform struct { Action ConfigAction NewName string GroupResourceLabels map[string]string - AggregationType aggregationType + AggregationType aggregateutil.AggregationType SubmatchCase submatchCase Operations []internalOperation } diff --git a/processor/metricstransformprocessor/metrics_transform_processor_otlp.go b/processor/metricstransformprocessor/metrics_transform_processor_otlp.go index c102cc9c9048..ad2e3b42c6e8 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor_otlp.go +++ b/processor/metricstransformprocessor/metrics_transform_processor_otlp.go @@ -10,6 +10,8 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" ) // extractAndRemoveMatchedMetrics extracts matched metrics from ms metric slice and returns a new slice. @@ -440,6 +442,17 @@ func combine(transform internalTransform, metrics pmetric.MetricSlice) pmetric.M return combinedMetric } +// groupMetrics groups all the provided timeseries that will be aggregated together based on all the label values. +// Returns a map of grouped timeseries and the corresponding selected labels +// canBeCombined must be callled before. +func groupMetrics(metrics pmetric.MetricSlice, aggType aggregateutil.AggregationType, to pmetric.Metric) { + ag := aggregateutil.AggGroups{} + for i := 0; i < metrics.Len(); i++ { + aggregateutil.GroupDataPoints(metrics.At(i), &ag) + } + aggregateutil.MergeDataPoints(to, aggType, ag) +} + func copyMetricDetails(from, to pmetric.Metric) { to.SetName(from.Name()) to.SetUnit(from.Unit()) @@ -541,7 +554,13 @@ func transformMetric(metric pmetric.Metric, transform internalTransform) bool { updateLabelOp(metric, op, transform.MetricIncludeFilter) case aggregateLabels: if canChangeMetric { - aggregateLabelsOp(metric, op) + attrs := []string{} + for k, v := range op.labelSetMap { + if v { + attrs = append(attrs, k) + } + } + aggregateLabelsOp(metric, attrs, op.configOperation.AggregationType) } case aggregateLabelValues: if canChangeMetric { diff --git a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go index ae623023590d..85337e5d842b 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go +++ b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go @@ -7,6 +7,8 @@ import ( "regexp" "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" ) type metricsTransformTest struct { @@ -223,7 +225,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: sum, + AggregationType: aggregateutil.Sum, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -251,7 +253,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: mean, + AggregationType: aggregateutil.Mean, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -279,7 +281,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: max, + AggregationType: aggregateutil.Max, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -308,7 +310,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: count, + AggregationType: aggregateutil.Count, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -327,6 +329,35 @@ var ( addIntDatapoint(1, 2, 3, "label1-value1").build(), }, }, + { + name: "metric_label_aggregation_median_int_update", + transforms: []internalTransform{ + { + MetricIncludeFilter: internalFilterStrict{include: "metric1"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: aggregateLabels, + AggregationType: aggregateutil.Median, + LabelSet: []string{"label1"}, + }, + labelSetMap: map[string]bool{"label1": true}, + }, + }, + }, + }, + in: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1", "label2"). + addIntDatapoint(1, 2, 1, "label1-value1", "label2-value1"). + addIntDatapoint(1, 2, 4, "label1-value1", "label2-value2"). + addIntDatapoint(1, 2, 2, "label1-value1", "label2-value2").build(), + }, + out: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1"). + addIntDatapoint(1, 2, 2, "label1-value1").build(), + }, + }, { name: "metric_label_aggregation_min_int_update", transforms: []internalTransform{ @@ -337,7 +368,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: min, + AggregationType: aggregateutil.Min, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -366,7 +397,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: sum, + AggregationType: aggregateutil.Sum, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -394,7 +425,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: mean, + AggregationType: aggregateutil.Mean, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -422,7 +453,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: max, + AggregationType: aggregateutil.Max, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -450,7 +481,35 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: count, + AggregationType: aggregateutil.Count, + LabelSet: []string{"label1"}, + }, + labelSetMap: map[string]bool{"label1": true}, + }, + }, + }, + }, + in: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1", "label2"). + addDoubleDatapoint(1, 2, 3, "label1-value1", "label2-value1"). + addDoubleDatapoint(1, 2, 1, "label1-value1", "label2-value2").build(), + }, + out: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1"). + addDoubleDatapoint(1, 2, 2, "label1-value1").build(), + }, + }, + { + name: "metric_label_aggregation_median_double_update", + transforms: []internalTransform{ + { + MetricIncludeFilter: internalFilterStrict{include: "metric1"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: aggregateLabels, + AggregationType: aggregateutil.Median, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -478,7 +537,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: min, + AggregationType: aggregateutil.Min, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -512,7 +571,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: sum, + AggregationType: aggregateutil.Sum, LabelSet: []string{"label1", "label2"}, }, labelSetMap: map[string]bool{"label1": true, "label2": true}, @@ -548,7 +607,7 @@ var ( configOperation: Operation{ Action: aggregateLabelValues, NewValue: "new/label2-value", - AggregationType: sum, + AggregationType: aggregateutil.Sum, Label: "label2", }, aggregatedValuesSet: map[string]bool{"label2-value1": true, "label2-value2": true}, @@ -583,7 +642,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: sum, + AggregationType: aggregateutil.Sum, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -619,7 +678,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: sum, + AggregationType: aggregateutil.Sum, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -650,7 +709,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: mean, + AggregationType: aggregateutil.Mean, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -1009,7 +1068,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: sum, + AggregationType: aggregateutil.Sum, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -1041,7 +1100,7 @@ var ( configOperation: Operation{ Action: aggregateLabelValues, NewValue: "new/label2-value", - AggregationType: sum, + AggregationType: aggregateutil.Sum, Label: "label2", }, aggregatedValuesSet: map[string]bool{"label2-value1": true, "label2-value2": true}, @@ -1072,7 +1131,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: sum, + AggregationType: aggregateutil.Sum, LabelSet: []string{"label1"}, }, labelSetMap: map[string]bool{"label1": true}, @@ -1168,7 +1227,7 @@ var ( MetricIncludeFilter: internalFilterRegexp{include: regexp.MustCompile("^metric[12]$")}, Action: Combine, NewName: "new", - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, }, in: []pmetric.Metric{ @@ -1199,7 +1258,7 @@ var ( { configOperation: Operation{ Action: aggregateLabels, - AggregationType: sum, + AggregationType: aggregateutil.Sum, LabelSet: []string{"$1", "new_label"}, }, labelSetMap: map[string]bool{"$1": true, "new_label": true}, @@ -1244,7 +1303,7 @@ var ( MetricIncludeFilter: internalFilterRegexp{include: regexp.MustCompile("^metric[12]$")}, Action: Combine, NewName: "new", - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, }, in: []pmetric.Metric{ @@ -1265,7 +1324,7 @@ var ( MetricIncludeFilter: internalFilterRegexp{include: regexp.MustCompile("^metric[12]$")}, Action: Combine, NewName: "new", - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, }, in: []pmetric.Metric{ @@ -1286,7 +1345,7 @@ var ( MetricIncludeFilter: internalFilterRegexp{include: regexp.MustCompile("^metric[12]$")}, Action: Combine, NewName: "new", - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, }, in: []pmetric.Metric{ @@ -1309,7 +1368,7 @@ var ( MetricIncludeFilter: internalFilterRegexp{include: regexp.MustCompile("^metric[12]$")}, Action: Combine, NewName: "new", - AggregationType: sum, + AggregationType: aggregateutil.Sum, }, }, in: []pmetric.Metric{ diff --git a/processor/metricstransformprocessor/operation_aggregate_label_values.go b/processor/metricstransformprocessor/operation_aggregate_label_values.go index e597b57d1f40..603bd5a82bc2 100644 --- a/processor/metricstransformprocessor/operation_aggregate_label_values.go +++ b/processor/metricstransformprocessor/operation_aggregate_label_values.go @@ -6,6 +6,8 @@ package metricstransformprocessor // import "github.com/open-telemetry/opentelem import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" ) // aggregateLabelValuesOp aggregates points that have the label values specified in aggregated_values @@ -22,9 +24,10 @@ func aggregateLabelValuesOp(metric pmetric.Metric, mtpOp internalOperation) { return true }) + ag := aggregateutil.AggGroups{} newMetric := pmetric.NewMetric() copyMetricDetails(metric, newMetric) - ag := groupDataPoints(metric, aggGroups{}) - mergeDataPoints(newMetric, mtpOp.configOperation.AggregationType, ag) + aggregateutil.GroupDataPoints(metric, &ag) + aggregateutil.MergeDataPoints(newMetric, mtpOp.configOperation.AggregationType, ag) newMetric.MoveTo(metric) } diff --git a/processor/metricstransformprocessor/operation_aggregate_labels.go b/processor/metricstransformprocessor/operation_aggregate_labels.go index a0129d2a870c..b322d771a30a 100644 --- a/processor/metricstransformprocessor/operation_aggregate_labels.go +++ b/processor/metricstransformprocessor/operation_aggregate_labels.go @@ -4,283 +4,18 @@ package metricstransformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor" import ( - "encoding/json" - "math" - - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" -) -type aggGroups struct { - gauge map[string]pmetric.NumberDataPointSlice - sum map[string]pmetric.NumberDataPointSlice - histogram map[string]pmetric.HistogramDataPointSlice - expHistogram map[string]pmetric.ExponentialHistogramDataPointSlice -} + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" +) // aggregateLabelsOp aggregates points that have the labels excluded in label_set -func aggregateLabelsOp(metric pmetric.Metric, mtpOp internalOperation) { - filterAttrs(metric, mtpOp.labelSetMap) +func aggregateLabelsOp(metric pmetric.Metric, attributes []string, aggrType aggregateutil.AggregationType) { + ag := aggregateutil.AggGroups{} + aggregateutil.FilterAttrs(metric, attributes) newMetric := pmetric.NewMetric() copyMetricDetails(metric, newMetric) - ag := groupDataPoints(metric, aggGroups{}) - mergeDataPoints(newMetric, mtpOp.configOperation.AggregationType, ag) + aggregateutil.GroupDataPoints(metric, &ag) + aggregateutil.MergeDataPoints(newMetric, aggrType, ag) newMetric.MoveTo(metric) } - -// groupMetrics groups all the provided timeseries that will be aggregated together based on all the label values. -// Returns a map of grouped timeseries and the corresponding selected labels -// canBeCombined must be callled before. -func groupMetrics(metrics pmetric.MetricSlice, aggType aggregationType, to pmetric.Metric) { - var ag aggGroups - for i := 0; i < metrics.Len(); i++ { - ag = groupDataPoints(metrics.At(i), ag) - } - mergeDataPoints(to, aggType, ag) -} - -func groupDataPoints(metric pmetric.Metric, ag aggGroups) aggGroups { - switch metric.Type() { - case pmetric.MetricTypeGauge: - if ag.gauge == nil { - ag.gauge = map[string]pmetric.NumberDataPointSlice{} - } - groupNumberDataPoints(metric.Gauge().DataPoints(), false, ag.gauge) - case pmetric.MetricTypeSum: - if ag.sum == nil { - ag.sum = map[string]pmetric.NumberDataPointSlice{} - } - groupByStartTime := metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityDelta - groupNumberDataPoints(metric.Sum().DataPoints(), groupByStartTime, ag.sum) - case pmetric.MetricTypeHistogram: - if ag.histogram == nil { - ag.histogram = map[string]pmetric.HistogramDataPointSlice{} - } - groupByStartTime := metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta - groupHistogramDataPoints(metric.Histogram().DataPoints(), groupByStartTime, ag.histogram) - case pmetric.MetricTypeExponentialHistogram: - if ag.expHistogram == nil { - ag.expHistogram = map[string]pmetric.ExponentialHistogramDataPointSlice{} - } - groupByStartTime := metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta - groupExponentialHistogramDataPoints(metric.ExponentialHistogram().DataPoints(), groupByStartTime, ag.expHistogram) - } - return ag -} - -func mergeDataPoints(to pmetric.Metric, aggType aggregationType, ag aggGroups) { - switch to.Type() { - case pmetric.MetricTypeGauge: - mergeNumberDataPoints(ag.gauge, aggType, to.Gauge().DataPoints()) - case pmetric.MetricTypeSum: - mergeNumberDataPoints(ag.sum, aggType, to.Sum().DataPoints()) - case pmetric.MetricTypeHistogram: - mergeHistogramDataPoints(ag.histogram, to.Histogram().DataPoints()) - case pmetric.MetricTypeExponentialHistogram: - mergeExponentialHistogramDataPoints(ag.expHistogram, to.ExponentialHistogram().DataPoints()) - } -} - -func groupNumberDataPoints(dps pmetric.NumberDataPointSlice, useStartTime bool, - dpsByAttrsAndTs map[string]pmetric.NumberDataPointSlice) { - var keyHashParts []any - for i := 0; i < dps.Len(); i++ { - if useStartTime { - keyHashParts = []any{dps.At(i).StartTimestamp().String()} - } - key := dataPointHashKey(dps.At(i).Attributes(), dps.At(i).Timestamp(), keyHashParts...) - if _, ok := dpsByAttrsAndTs[key]; !ok { - dpsByAttrsAndTs[key] = pmetric.NewNumberDataPointSlice() - } - dps.At(i).MoveTo(dpsByAttrsAndTs[key].AppendEmpty()) - } -} - -func groupHistogramDataPoints(dps pmetric.HistogramDataPointSlice, useStartTime bool, - dpsByAttrsAndTs map[string]pmetric.HistogramDataPointSlice) { - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - keyHashParts := make([]any, 0, dp.ExplicitBounds().Len()+4) - for b := 0; b < dp.ExplicitBounds().Len(); b++ { - keyHashParts = append(keyHashParts, dp.ExplicitBounds().At(b)) - } - if useStartTime { - keyHashParts = append(keyHashParts, dp.StartTimestamp().String()) - } - - keyHashParts = append(keyHashParts, dp.HasMin(), dp.HasMax(), uint32(dp.Flags())) - key := dataPointHashKey(dps.At(i).Attributes(), dp.Timestamp(), keyHashParts...) - if _, ok := dpsByAttrsAndTs[key]; !ok { - dpsByAttrsAndTs[key] = pmetric.NewHistogramDataPointSlice() - } - dp.MoveTo(dpsByAttrsAndTs[key].AppendEmpty()) - } -} - -func groupExponentialHistogramDataPoints(dps pmetric.ExponentialHistogramDataPointSlice, useStartTime bool, - dpsByAttrsAndTs map[string]pmetric.ExponentialHistogramDataPointSlice) { - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - keyHashParts := make([]any, 0, 5) - keyHashParts = append(keyHashParts, dp.Scale(), dp.HasMin(), dp.HasMax(), uint32(dp.Flags()), dp.Negative().Offset(), - dp.Positive().Offset()) - if useStartTime { - keyHashParts = append(keyHashParts, dp.StartTimestamp().String()) - } - key := dataPointHashKey(dps.At(i).Attributes(), dp.Timestamp(), keyHashParts...) - if _, ok := dpsByAttrsAndTs[key]; !ok { - dpsByAttrsAndTs[key] = pmetric.NewExponentialHistogramDataPointSlice() - } - dp.MoveTo(dpsByAttrsAndTs[key].AppendEmpty()) - } -} - -func filterAttrs(metric pmetric.Metric, filterAttrKeys map[string]bool) { - if filterAttrKeys == nil { - return - } - rangeDataPointAttributes(metric, func(attrs pcommon.Map) bool { - attrs.RemoveIf(func(k string, _ pcommon.Value) bool { - return !filterAttrKeys[k] - }) - return true - }) -} - -func dataPointHashKey(atts pcommon.Map, ts pcommon.Timestamp, other ...any) string { - hashParts := []any{atts.AsRaw(), ts.String()} - jsonStr, _ := json.Marshal(append(hashParts, other...)) - return string(jsonStr) -} - -func mergeNumberDataPoints(dpsMap map[string]pmetric.NumberDataPointSlice, agg aggregationType, to pmetric.NumberDataPointSlice) { - for _, dps := range dpsMap { - dp := to.AppendEmpty() - dps.At(0).MoveTo(dp) - switch dp.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - for i := 1; i < dps.Len(); i++ { - switch agg { - case sum, mean: - dp.SetDoubleValue(dp.DoubleValue() + doubleVal(dps.At(i))) - case max: - dp.SetDoubleValue(math.Max(dp.DoubleValue(), doubleVal(dps.At(i)))) - case min: - dp.SetDoubleValue(math.Min(dp.DoubleValue(), doubleVal(dps.At(i)))) - case count: - dp.SetDoubleValue(float64(dps.Len())) - } - if dps.At(i).StartTimestamp() < dp.StartTimestamp() { - dp.SetStartTimestamp(dps.At(i).StartTimestamp()) - } - } - if agg == mean { - dp.SetDoubleValue(dp.DoubleValue() / float64(dps.Len())) - } - case pmetric.NumberDataPointValueTypeInt: - for i := 1; i < dps.Len(); i++ { - switch agg { - case sum, mean: - dp.SetIntValue(dp.IntValue() + dps.At(i).IntValue()) - case max: - if dp.IntValue() < intVal(dps.At(i)) { - dp.SetIntValue(intVal(dps.At(i))) - } - case min: - if dp.IntValue() > intVal(dps.At(i)) { - dp.SetIntValue(intVal(dps.At(i))) - } - case count: - dp.SetIntValue(int64(dps.Len())) - } - if dps.At(i).StartTimestamp() < dp.StartTimestamp() { - dp.SetStartTimestamp(dps.At(i).StartTimestamp()) - } - } - if agg == mean { - dp.SetIntValue(dp.IntValue() / int64(dps.Len())) - } - } - } -} - -func doubleVal(dp pmetric.NumberDataPoint) float64 { - switch dp.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - return dp.DoubleValue() - case pmetric.NumberDataPointValueTypeInt: - return float64(dp.IntValue()) - } - return 0 -} - -func intVal(dp pmetric.NumberDataPoint) int64 { - switch dp.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - return int64(dp.DoubleValue()) - case pmetric.NumberDataPointValueTypeInt: - return dp.IntValue() - } - return 0 -} - -func mergeHistogramDataPoints(dpsMap map[string]pmetric.HistogramDataPointSlice, to pmetric.HistogramDataPointSlice) { - for _, dps := range dpsMap { - dp := to.AppendEmpty() - dps.At(0).MoveTo(dp) - counts := dp.BucketCounts() - for i := 1; i < dps.Len(); i++ { - if dps.At(i).Count() == 0 { - continue - } - dp.SetCount(dp.Count() + dps.At(i).Count()) - dp.SetSum(dp.Sum() + dps.At(i).Sum()) - if dp.HasMin() && dp.Min() > dps.At(i).Min() { - dp.SetMin(dps.At(i).Min()) - } - if dp.HasMax() && dp.Max() < dps.At(i).Max() { - dp.SetMax(dps.At(i).Max()) - } - for b := 0; b < dps.At(i).BucketCounts().Len(); b++ { - counts.SetAt(b, counts.At(b)+dps.At(i).BucketCounts().At(b)) - } - dps.At(i).Exemplars().MoveAndAppendTo(dp.Exemplars()) - if dps.At(i).StartTimestamp() < dp.StartTimestamp() { - dp.SetStartTimestamp(dps.At(i).StartTimestamp()) - } - } - } -} - -func mergeExponentialHistogramDataPoints(dpsMap map[string]pmetric.ExponentialHistogramDataPointSlice, - to pmetric.ExponentialHistogramDataPointSlice) { - for _, dps := range dpsMap { - dp := to.AppendEmpty() - dps.At(0).MoveTo(dp) - negatives := dp.Negative().BucketCounts() - positives := dp.Positive().BucketCounts() - for i := 1; i < dps.Len(); i++ { - if dps.At(i).Count() == 0 { - continue - } - dp.SetCount(dp.Count() + dps.At(i).Count()) - dp.SetSum(dp.Sum() + dps.At(i).Sum()) - if dp.HasMin() && dp.Min() > dps.At(i).Min() { - dp.SetMin(dps.At(i).Min()) - } - if dp.HasMax() && dp.Max() < dps.At(i).Max() { - dp.SetMax(dps.At(i).Max()) - } - for b := 0; b < dps.At(i).Negative().BucketCounts().Len(); b++ { - negatives.SetAt(b, negatives.At(b)+dps.At(i).Negative().BucketCounts().At(b)) - } - for b := 0; b < dps.At(i).Positive().BucketCounts().Len(); b++ { - positives.SetAt(b, positives.At(b)+dps.At(i).Positive().BucketCounts().At(b)) - } - dps.At(i).Exemplars().MoveAndAppendTo(dp.Exemplars()) - if dps.At(i).StartTimestamp() < dp.StartTimestamp() { - dp.SetStartTimestamp(dps.At(i).StartTimestamp()) - } - } - } -}