Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Add functions for conversion of scalar metric types #10255

Merged
merged 13 commits into from
Jun 1, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

- `tailsamplingprocessor`: Add support for string invert matching to `and` policy (#9553)
- `mezemoexporter`: Add user agent string to outgoing HTTP requests (#10470)
- `transformprocessor`: Add functions for conversion of scalar metric types (`gauge_to_sum` and `sum_to_gauge`) (#10255)

### 🧰 Bug fixes 🧰

Expand Down
11 changes: 11 additions & 0 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ the fields specified by the list of strings. e.g., `keep_keys(attributes, "http.

- `replace_all_matches(target, pattern, replacement)` - `target` is a path expression to a map type field, `pattern` is a string following [filepath.Match syntax](https://pkg.go.dev/path/filepath#Match), and `replacement` is a string. Each string value in `target` that matches `pattern` will get replaced with `replacement`. e.g., `replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")`

Metric only functions:
- `convert_sum_to_gauge()` - Converts incoming metrics of type "Sum" to type "Gauge", retaining the metric's datapoints. Noop for metrics that are not of type "Sum".
**NOTE:** This function may cause a metric to break semantics for [Gauge metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#gauge). Use at your own risk.

- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge".
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#sums). Use at your own risk.

Supported where operations:
- `==` - matches telemetry where the values are equal to each other
- `!=` - matches telemetry where the values are not equal to each other
Expand Down Expand Up @@ -70,6 +77,8 @@ processors:
- limit(attributes, 100)
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
- convert_sum_to_gauge() where metric.name == "system.processes.count"
- convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric"
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
Expand Down Expand Up @@ -108,6 +117,8 @@ All metrics and their data points
4) Limit all data point attributes such that each data point has no more than 100 attributes.
6) Truncate all data point attributes such that no string value has more than 4096 characters.
7) Truncate all resource attributes such that no string value has more than 4096 characters.
8) Convert all metrics with name `system.processes.count` from a Sum to Gauge.
9) Convert all metrics with name `prometheus_metric` from Gauge to a cumulative, non-monotonic Sum.

All logs

Expand Down
3 changes: 2 additions & 1 deletion processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestLoadingConfig(t *testing.T) {
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: traces.DefaultFunctions(),
functions: metrics.DefaultFunctions(),
},
Logs: SignalConfig{
Queries: []string{
Expand Down
3 changes: 2 additions & 1 deletion processor/transformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
Expand All @@ -50,7 +51,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
Metrics: SignalConfig{
Queries: []string{},

functions: traces.DefaultFunctions(),
functions: metrics.DefaultFunctions(),
BinaryFissionGames marked this conversation as resolved.
Show resolved Hide resolved
},
Logs: SignalConfig{
Queries: []string{},
Expand Down
5 changes: 5 additions & 0 deletions processor/transformprocessor/internal/common/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ func NewFunctionCall(inv Invocation, functions map[string]interface{}, pathParse
return nil, fmt.Errorf("invalid argument at position %v, must be a string", i)
}
args = append(args, reflect.ValueOf(*argDef.String))
case "bool":
if argDef.Bool == nil {
return nil, fmt.Errorf("invalid argument at position %v, must be a bool", i)
}
args = append(args, reflect.ValueOf(bool(*argDef.Bool)))
}
}
val := reflect.ValueOf(f)
Expand Down
10 changes: 10 additions & 0 deletions processor/transformprocessor/internal/common/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ import (
"go.uber.org/multierr"
)

// Type for capturing booleans, see:
// https://github.com/alecthomas/participle#capturing-boolean-value
type Boolean bool

func (b *Boolean) Capture(values []string) error {
BinaryFissionGames marked this conversation as resolved.
Show resolved Hide resolved
*b = values[0] == "true"
return nil
}

// ParsedQuery represents a parsed query. It is the entry point into the query DSL.
// nolint:govet
type ParsedQuery struct {
Expand Down Expand Up @@ -50,6 +59,7 @@ type Value struct {
String *string `| @String`
Float *float64 `| @Float`
Int *int64 `| @Int`
Bool *Boolean `| @("true" | "false")`
Path *Path `| @@ )`
}

Expand Down
34 changes: 34 additions & 0 deletions processor/transformprocessor/internal/common/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,40 @@ func Test_parse(t *testing.T) {
Condition: nil,
},
},
{
query: `convert_gauge_to_sum("cumulative", false)`,
expected: &ParsedQuery{
Invocation: Invocation{
Function: "convert_gauge_to_sum",
Arguments: []Value{
{
String: testhelper.Strp("cumulative"),
},
{
Bool: (*Boolean)(testhelper.Boolp(false)),
},
},
},
Condition: nil,
},
},
{
query: `convert_gauge_to_sum("cumulative", true)`,
expected: &ParsedQuery{
Invocation: Invocation{
Function: "convert_gauge_to_sum",
Arguments: []Value{
{
String: testhelper.Strp("cumulative"),
},
{
Bool: (*Boolean)(testhelper.Boolp(true)),
},
},
},
Condition: nil,
},
},
}

for _, tt := range tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ func Floatp(f float64) *float64 {
func Intp(i int64) *int64 {
return &i
}

func Boolp(b bool) *bool {
return &b
}
77 changes: 75 additions & 2 deletions processor/transformprocessor/internal/metrics/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,83 @@
package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

// registry is a map of names to functions for metrics pipelines
var registry = map[string]interface{}{
"convert_sum_to_gauge": convertSumToGauge,
"convert_gauge_to_sum": convertGaugeToSum,
}

func init() {
// Init metrics registry with default functions common to all signals
for k, v := range common.DefaultFunctions() {
registry[k] = v
}
}

func DefaultFunctions() map[string]interface{} {
// No metric-only functions yet.
return common.DefaultFunctions()
return registry
}

func convertSumToGauge() (common.ExprFunc, error) {
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSum {
return nil
}

dps := metric.Sum().DataPoints()

metric.SetDataType(pmetric.MetricDataTypeGauge)
// Setting the data type removed all the data points, so we must copy them back to the metric.
dps.CopyTo(metric.Gauge().DataPoints())
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved

return nil
}, nil
}

func convertGaugeToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved

return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeGauge {
return nil
}

dps := metric.Gauge().DataPoints()

metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(aggTemp)
metric.Sum().SetIsMonotonic(monotonic)

// Setting the data type removed all the data points, so we must copy them back to the metric.
dps.CopyTo(metric.Sum().DataPoints())

return nil
}, nil
}
Loading