Skip to content

Commit

Permalink
[processor/transform] introduce aggregate_on_attribute_value function…
Browse files Browse the repository at this point in the history
… for metrics (open-telemetry#33423)

**Link to tracking Issue:** open-telemetry#16224 

**Changes:**

- implemented `aggregate_on_attribute_value` function
- tests
- documentation

**Depends on**
open-telemetry#33669

---------

Signed-off-by: odubajDT <[email protected]>
  • Loading branch information
odubajDT authored and f7o committed Sep 12, 2024
1 parent e668f3a commit 4bf8ac9
Show file tree
Hide file tree
Showing 7 changed files with 855 additions and 1 deletion.
27 changes: 27 additions & 0 deletions .chloggen/feat_16224_aggregate_label_value.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Support aggregating metrics based on their attribute values and substituting the values with a new value."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [16224]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
49 changes: 48 additions & 1 deletion processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ In addition to OTTL functions, the processor defines its own functions to help w
- [copy_metric](#copy_metric)
- [scale_metric](#scale_metric)
- [aggregate_on_attributes](#aggregate_on_attributes)
- [aggregate_on_attribute_value](#aggregate_on_attribute_value)

### convert_sum_to_gauge

Expand Down Expand Up @@ -374,10 +375,12 @@ Examples:

`aggregate_on_attributes(function, Optional[attributes])`

The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys to aggregate upon.
The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys of type string to aggregate upon.

`aggregate_on_attributes` function removes all attributes that are present in datapoints except the ones that are specified in the `attributes` parameter. If `attributes` parameter is not set, all attributes are removed from datapoints. Afterwards all datapoints are aggregated depending on the attributes left (none or the ones present in the list).

**NOTE:** This function is supported only in `metric` context.

The following metric types can be aggregated:

- sum
Expand Down Expand Up @@ -415,6 +418,50 @@ statements:

To aggregate only using a specified set of attributes, you can use `keep_matching_keys`.

### aggregate_on_attribute_value

`aggregate_on_attribute_value(function, attribute, values, newValue)`

The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` (type string) with one of the values present in the `values` parameter (list of strings) into a single datapoint where the attribute has the value `newValue` (type string). `function` is a case-sensitive string that represents the aggregation function.

**NOTE:** This function is supported only in `metric` context.

The following metric types can be aggregated:

- sum
- gauge
- histogram
- exponential histogram

Supported aggregation functions are:

- sum
- max
- min
- mean
- median
- count

**NOTE:** Only the `sum` agregation function is supported for histogram and exponential histogram datatypes.

Examples:

- `aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"`

The `aggregate_on_attribute_value` function can also be used in conjunction with
[keep_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#keep_matching_keys) or
[delete_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#delete_matching_keys).

For example, to remove attribute keys matching a regex and aggregate the metrics on the remaining attributes, you can perform the following statement sequence:

```yaml
statements:
- delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage"
- aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"
```

To aggregate only using a specified set of attributes, you can use `keep_matching_keys`.

## Examples

### Perform transformation if field does not exist
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
)

type aggregateOnAttributeValueArguments struct {
AggregationFunction string
Attribute string
Values []string
NewValue string
}

func newAggregateOnAttributeValueFactory() ottl.Factory[ottlmetric.TransformContext] {
return ottl.NewFactory("aggregate_on_attribute_value", &aggregateOnAttributeValueArguments{}, createAggregateOnAttributeValueFunction)
}

func createAggregateOnAttributeValueFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) {
args, ok := oArgs.(*aggregateOnAttributeValueArguments)

if !ok {
return nil, fmt.Errorf("AggregateOnAttributeValueFactory args must be of type *AggregateOnAttributeValueArguments")
}

t, err := aggregateutil.ConvertToAggregationFunction(args.AggregationFunction)
if err != nil {
return nil, fmt.Errorf("invalid aggregation function: '%s', valid options: %s", err.Error(), aggregateutil.GetSupportedAggregationFunctionsList())
}

return AggregateOnAttributeValue(t, args.Attribute, args.Values, args.NewValue)
}

func AggregateOnAttributeValue(aggregationType aggregateutil.AggregationType, attribute string, values []string, newValue string) (ottl.ExprFunc[ottlmetric.TransformContext], error) {
return func(_ context.Context, tCtx ottlmetric.TransformContext) (any, error) {
metric := tCtx.GetMetric()

aggregateutil.RangeDataPointAttributes(metric, func(attrs pcommon.Map) bool {
val, ok := attrs.Get(attribute)
if !ok {
return true
}

for _, v := range values {
if val.Str() == v {
val.SetStr(newValue)
}
}
return true
})
ag := aggregateutil.AggGroups{}
newMetric := pmetric.NewMetric()
aggregateutil.CopyMetricDetails(metric, newMetric)
aggregateutil.GroupDataPoints(metric, &ag)
aggregateutil.MergeDataPoints(newMetric, aggregationType, ag)
newMetric.MoveTo(metric)

return nil, nil
}, nil
}
Loading

0 comments on commit 4bf8ac9

Please sign in to comment.