Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Add new Context-specific configuration options #15381

Merged
merged 32 commits into from
Nov 10, 2022
Merged
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b858f59
Add new Context-specific configuration options
TylerHelmuth Oct 20, 2022
07457a8
Add ContextStatements config
TylerHelmuth Oct 21, 2022
cd058e7
Add high-level context
TylerHelmuth Oct 25, 2022
5f8471c
Add high-level context
TylerHelmuth Oct 25, 2022
2da5d8a
Merge branch 'open-telemetry:main' into tp-enhanced-contexts-2
TylerHelmuth Oct 26, 2022
ffc1fee
respond to feedback
TylerHelmuth Oct 27, 2022
38b7d9c
Fix merge
TylerHelmuth Oct 27, 2022
9133edf
Merge branch 'open-telemetry:main' into tp-enhanced-contexts-2
TylerHelmuth Oct 27, 2022
cab0096
Merge branch 'open-telemetry:main' into tp-enhanced-contexts-2
TylerHelmuth Oct 28, 2022
edfe1a4
Adjust ParserCollection
TylerHelmuth Oct 31, 2022
cdf7c7e
Add example usage temporarily
TylerHelmuth Oct 31, 2022
a19bd17
apply feedback
TylerHelmuth Oct 31, 2022
14d60b1
Split into individual files
TylerHelmuth Nov 1, 2022
7e044a4
merge in upstream/main
TylerHelmuth Nov 2, 2022
8950da0
Add spanevent and metric contexts
TylerHelmuth Nov 2, 2022
438ce15
revert logs processor example
TylerHelmuth Nov 2, 2022
b725cfb
Merge in updates
TylerHelmuth Nov 2, 2022
1371a6e
merge in upstream/main
TylerHelmuth Nov 8, 2022
85045f5
Fix lint
TylerHelmuth Nov 8, 2022
1a83c26
run make gotidy
TylerHelmuth Nov 8, 2022
335d124
fix impi
TylerHelmuth Nov 8, 2022
4c1dd6b
Update readme
TylerHelmuth Nov 8, 2022
8ce73a0
merge in upstream/main
TylerHelmuth Nov 8, 2022
7cc39ed
add changelog entry
TylerHelmuth Nov 8, 2022
caf0889
Merge remote-tracking branch 'upstream/main' into tp-enhanced-contexts
TylerHelmuth Nov 8, 2022
342d086
merge in upstream/main
TylerHelmuth Nov 9, 2022
c3f163b
Update processor/transformprocessor/README.md
TylerHelmuth Nov 9, 2022
ad39509
Add test cases that reuse context
TylerHelmuth Nov 9, 2022
3d7e974
Merge branch 'tp-enhanced-contexts' of https://github.com/TylerHelmut…
TylerHelmuth Nov 9, 2022
6e7426a
Update processor/transformprocessor/README.md
TylerHelmuth Nov 9, 2022
a939062
Apply feedback
TylerHelmuth Nov 9, 2022
1ad3e04
Update processor/transformprocessor/README.md
TylerHelmuth Nov 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .chloggen/tp-enhanced-contexts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transform

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds new configuration options that allow specifying the OTTL context to use when executing statements. See [Transform Processor README](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor#config) for more details.

# One or more tracking issues related to the change
issues: [15381]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The existing configuration options will be deprecated in a future release.
150 changes: 115 additions & 35 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
@@ -8,66 +8,146 @@
| Warnings | [Unsound Transformations, Identity Conflict, Orphaned Telemetry, Other](#warnings) |

The transform processor modifies telemetry based on configuration using the [OpenTelemetry Transformation Language](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl).
The processor takes a list of statements for each signal type and executes the statements against the incoming telemetry in the order specified in the config. Each statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed.

For each signal type, the processor takes a list of statements associated to a [Context type](#contexts) and executes the statements against the incoming telemetry in the order specified in the config.
Each statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed.

## Config

The transform processor allows configuring statements for traces, metrics, and logs. Each signal specifies a list of string statements that get passed to the OTTL for interpretation.
The transform processor allows configuring multiple context statements for traces, metrics, and logs.
The value of `context` specifies which [OTTL Context](#contexts) to use when interpreting the associated statements.
The statement strings, which must be OTTL compatible, will be passed to the OTTL and interpreted using the associated context.
Each context will be processed in the order specified and each statement for a context will be executed in the order specified.

```yaml
transform:
<traces|metrics|logs>:
statements:
- string
- string
- string
<trace|metric|log>_statements:
- context: string
statements:
- string
- string
- string
- context: string
statements:
- string
- string
- string
```
Proper use of contexts will provide increased performance and capabilities. See [Contexts](#contexts) for more details.
Valid values for `context` are:

| Signal | Context Values |
|-------------------|------------------------------------------------|
| trace_statements | `resrouce`, `scope`, `trace`, and `spanevent` |
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
| metric_statements | `resrouce`, `scope`, `metric`, and `datapoint` |
| log_statements | `resrouce`, `scope`, and `log` |
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved

## Example

The example takes advantage of context efficiency by grouping transformations with the context which it indents to transform.
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
See [Contexts](#contexts) for more details.

Example configuration:
```yaml
transform:
traces:
statements:
- set(status.code, 1) where attributes["http.path"] == "/health"
- keep_keys(resource.attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"])
- set(name, attributes["http.route"])
- replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")
- replace_pattern(resource.attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***")
- limit(attributes, 100, [])
- limit(resource.attributes, 100, [])
trace_statements:
- context: resource
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
statements:
- keep_keys(attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"])
- replace_pattern(attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***")
- limit(attributes, 100, [])
- truncate_all(attributes, 4096)
- context: trace
statements:
- set(status.code, 1) where attributes["http.path"] == "/health"
- set(name, attributes["http.route"])
- replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")
- limit(attributes, 100, [])
- truncate_all(attributes, 4096)
metric_statements:
- context: resource
statements:
- keep_keys(attributes, ["host.name"])
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
metrics:
statements:
- set(metric.description, "Sum") where metric.type == "Sum"
- keep_keys(resource.attributes, ["host.name"])
- limit(attributes, 100, ["host.name"])
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
- convert_sum_to_gauge() where metric.name == "system.processes.count"
- convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric"
logs:
statements:
- set(severity_text, "FAIL") where body == "request failed"
- replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")
- replace_all_patterns(attributes, "/account/\\d{4}", "/account/{accountId}")
- set(body, attributes["http.route"])
- keep_keys(resource.attributes, ["service.name", "service.namespace", "cloud.region"])
- context: metric
statements:
- set(description, "Sum") where type == "Sum"
- context: datapoint
statements:
- limit(attributes, 100, ["host.name"])
- truncate_all(attributes, 4096)
- convert_sum_to_gauge() where metric.name == "system.processes.count"
- convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric"
log_statements:
- context: resource
statements:
- keep_keys(resource.attributes, ["service.name", "service.namespace", "cloud.region"])
- context: log
statements:
- set(severity_text, "FAIL") where body == "request failed"
- replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")
- replace_all_patterns(attributes, "/account/\\d{4}", "/account/{accountId}")
- set(body, attributes["http.route"])
```

## Grammar

You can learn more in-depth details on the capabilities and limitations of the OpenTelemetry Transformation Language used by the transform processor by reading about its [grammar](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl#grammar).

## Contexts

The transform processor utilizes the OTTL's standard contexts for Traces, Metrics and Logs. The contexts allow the OTTL to interact with the underlying telemetry data in its pdata form.
The transform processor utilizes the OTTL's contexts to transform Resource, Scope, Trace, SpanEvent, Metric, DataPoint, and Log telemetry.
The contexts allow the OTTL to interact with the underlying telemetry data in its pdata form.

- [Resource Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlresource)
- [Scope Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlscope)
- [Traces Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottltraces)
- [Metrics Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottldatapoints)
- [SpanEvent Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspanevent)
- [Metric Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlmetric)
- [DataPoint Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottldatapoints)
- [Logs Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottllogs)

Each context allows transformation of its type of telemetry.
For example, statements associated to a `resource` context will be able to transform the resource's `attributes` and `dropped_attributes_count`.

Contexts __NEVER__ supply access to individual items "lower" in the protobuf definition.
- This means statements associated to a `resource` __WILL NOT__ be able to access the underlying instrumentation scopes.
- This means statements associated to a `scope` __WILL NOT__ be able to access the underlying telemetry slices (spans, metrics, or logs).
- Similarly, statements associated to a `metric` __WILL NOT__ be able to access individual datapoints, but can access the entire datapoints slice.
- Similarly, statements associated to a `trace` __WILL NOT__ be able to access individual SpanEvents, but can access the entire SpanEvents slice.

For practical purposes, this means that a context cannot make decisions on its telemetry based on telemetry "lower" in the structure.
For example, __the following context statement is not possible__ because it attempts to use individual datapoint attributes in the condition of a statements that is associated to a `metric`

```yaml
metric_statements:
- context: metric
statements:
- set(description, "test passed") where datapoints.attributes["test"] == "pass"
```

Context __ALWAYS__ supply access to the items "higher" in the protobuf definition that are associated to the telemetry being transformed.
- This means that statements associated to a `datapoint` have access to a datapoint's metric, instrumentation scope, and resource.
- This means that statements associated to a `spanevent` have access to a spanevent's span, instrumentation scope, and resource.
- This means that statements associated to a `trace`/`metric`/`log` have access to the telemetry's instrumentation scope, and resource.
- This means that statements associated to a `scope` have access to the scope's resource.

For example, __the following context statement is possible__ because `datapoint` statements can access the datapoint's metric.

```yaml
metric_statements:
- context: datapoint
statements:
- set(metric.description, "test passed") where attributes["test"] == "pass"
```

Whenever possible, associate your statements to the context that the statement intend to transform.
Although you can modify resource attributes associated to a span using the `trace` context, it is more efficient to use the `resource` context.
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved

## Supported functions:

Since the transform processor utilizes the OTTL's contexts for Traces, Metrics, and Logs, it is able to utilize functions that expect pdata in addition to any common functions. These common functions can be used for any signal.
86 changes: 71 additions & 15 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
@@ -15,14 +15,16 @@
package transformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"

import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoints"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottltraces"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
@@ -31,6 +33,11 @@ import (
type Config struct {
config.ProcessorSettings `mapstructure:",squash"`

TraceStatements []common.ContextStatements `mapstructure:"trace_statements"`
MetricStatements []common.ContextStatements `mapstructure:"metric_statements"`
LogStatements []common.ContextStatements `mapstructure:"log_statements"`

// Deprecated. Use TraceStatements, MetricStatements, and LogStatements instead
OTTLConfig `mapstructure:",squash"`
}

@@ -47,24 +54,73 @@ type SignalConfig struct {
var _ component.ProcessorConfig = (*Config)(nil)

func (c *Config) Validate() error {
var errors error
if (len(c.Traces.Statements) > 0 || len(c.Metrics.Statements) > 0 || len(c.Logs.Statements) > 0) &&
(len(c.TraceStatements) > 0 || len(c.MetricStatements) > 0 || len(c.LogStatements) > 0) {
return fmt.Errorf("cannot use Traces, Metrics and/or Logs with TraceStatements, MetricStatements and/or LogStatements")
}

if len(c.Traces.Statements) > 0 {
ottltracesp := ottltraces.NewParser(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottltracesp.ParseStatements(c.Traces.Statements)
if err != nil {
return err
}
}

ottltracesp := ottltraces.NewParser(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottltracesp.ParseStatements(c.Traces.Statements)
if err != nil {
errors = multierr.Append(errors, err)
if len(c.TraceStatements) > 0 {
pc, err := common.NewTraceParserCollection(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
if err != nil {
return err
}
for _, cs := range c.TraceStatements {
_, err = pc.ParseContextStatements(cs)
if err != nil {
return err
}
}
}

ottlmetricsp := ottldatapoints.NewParser(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err = ottlmetricsp.ParseStatements(c.Metrics.Statements)
if err != nil {
errors = multierr.Append(errors, err)
if len(c.Metrics.Statements) > 0 {
ottlmetricsp := ottldatapoints.NewParser(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottlmetricsp.ParseStatements(c.Metrics.Statements)
if err != nil {
return err
}
}

ottllogsp := ottllogs.NewParser(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err = ottllogsp.ParseStatements(c.Logs.Statements)
if err != nil {
errors = multierr.Append(errors, err)
if len(c.MetricStatements) > 0 {
pc, err := common.NewMetricParserCollection(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
if err != nil {
return err
}
for _, cs := range c.MetricStatements {
_, err = pc.ParseContextStatements(cs)
if err != nil {
return err
}
}
}
return errors

if len(c.Logs.Statements) > 0 {
ottllogsp := ottllogs.NewParser(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottllogsp.ParseStatements(c.Logs.Statements)
if err != nil {
return err
}
}

if len(c.LogStatements) > 0 {
pc, err := common.NewLogParserCollection(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
if err != nil {
return err
}
for _, cs := range c.LogStatements {
_, err = pc.ParseContextStatements(cs)
if err != nil {
return err
}
}
}

return nil
}
93 changes: 88 additions & 5 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -19,10 +19,11 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func TestLoadConfig(t *testing.T) {
@@ -37,6 +38,68 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(typeStr, ""),
expected: &Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
OTTLConfig: OTTLConfig{
Traces: SignalConfig{
Statements: []string{},
},
Metrics: SignalConfig{
Statements: []string{},
},
Logs: SignalConfig{
Statements: []string{},
},
},
TraceStatements: []common.ContextStatements{
{
Context: "trace",
Statements: []string{
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Context: "resource",
Statements: []string{
`set(attributes["name"], "bear")`,
},
},
},
MetricStatements: []common.ContextStatements{
{
Context: "datapoint",
Statements: []string{
`set(metric.name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Context: "resource",
Statements: []string{
`set(attributes["name"], "bear")`,
},
},
},
LogStatements: []common.ContextStatements{
{
Context: "log",
Statements: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Context: "resource",
Statements: []string{
`set(attributes["name"], "bear")`,
},
},
},
},
},
{
id: component.NewIDWithName(typeStr, "deprecated_format"),
expected: &Config{
ProcessorSettings: config.NewProcessorSettings(component.NewIDWithName(typeStr, "")),
OTTLConfig: OTTLConfig{
Traces: SignalConfig{
Statements: []string{
@@ -57,8 +120,15 @@ func TestLoadConfig(t *testing.T) {
},
},
},
TraceStatements: []common.ContextStatements{},
MetricStatements: []common.ContextStatements{},
LogStatements: []common.ContextStatements{},
},
},
{
id: component.NewIDWithName(typeStr, "using_both_formats"),
errorMessage: "cannot use Traces, Metrics and/or Logs with TraceStatements, MetricStatements and/or LogStatements",
},
{
id: component.NewIDWithName(typeStr, "bad_syntax_trace"),
errorMessage: "1:18: unexpected token \"where\" (expected \")\")",
@@ -67,7 +137,6 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(typeStr, "unknown_function_trace"),
errorMessage: "undefined function not_a_function",
},

{
id: component.NewIDWithName(typeStr, "bad_syntax_metric"),
errorMessage: "1:18: unexpected token \"where\" (expected \")\")",
@@ -88,14 +157,14 @@ func TestLoadConfig(t *testing.T) {
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
assert.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalProcessorConfig(sub, cfg))
assert.NoError(t, err)
assert.NoError(t, component.UnmarshalProcessorConfig(sub, cfg))

if tt.expected == nil {
assert.EqualError(t, cfg.Validate(), tt.errorMessage)
@@ -106,3 +175,17 @@ func TestLoadConfig(t *testing.T) {
})
}
}

func Test_UnknownContextID(t *testing.T) {
id := component.NewIDWithName(typeStr, "unknown_context")

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
assert.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(id.String())
assert.NoError(t, err)
assert.Error(t, component.UnmarshalProcessorConfig(sub, cfg))
}
10 changes: 7 additions & 3 deletions processor/transformprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
@@ -59,6 +60,9 @@ func createDefaultConfig() component.ProcessorConfig {
Statements: []string{},
},
},
TraceStatements: []common.ContextStatements{},
MetricStatements: []common.ContextStatements{},
LogStatements: []common.ContextStatements{},
}
}

@@ -70,7 +74,7 @@ func createLogsProcessor(
) (component.LogsProcessor, error) {
oCfg := cfg.(*Config)

proc, err := logs.NewProcessor(oCfg.Logs.Statements, set.TelemetrySettings)
proc, err := logs.NewProcessor(oCfg.Logs.Statements, oCfg.LogStatements, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
@@ -91,7 +95,7 @@ func createTracesProcessor(
) (component.TracesProcessor, error) {
oCfg := cfg.(*Config)

proc, err := traces.NewProcessor(oCfg.Traces.Statements, set.TelemetrySettings)
proc, err := traces.NewProcessor(oCfg.Traces.Statements, oCfg.TraceStatements, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
@@ -112,7 +116,7 @@ func createMetricsProcessor(
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)

proc, err := metrics.NewProcessor(oCfg.Metrics.Statements, set.TelemetrySettings)
proc, err := metrics.NewProcessor(oCfg.Metrics.Statements, oCfg.MetricStatements, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
5 changes: 5 additions & 0 deletions processor/transformprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,8 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func TestFactory_Type(t *testing.T) {
@@ -49,6 +51,9 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
Statements: []string{},
},
},
TraceStatements: []common.ContextStatements{},
MetricStatements: []common.ContextStatements{},
LogStatements: []common.ContextStatements{},
})
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}
2 changes: 1 addition & 1 deletion processor/transformprocessor/go.mod
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ require (
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/collector v0.64.0
go.opentelemetry.io/collector/pdata v0.64.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.23.0
)

@@ -35,6 +34,7 @@ require (
go.opentelemetry.io/otel/metric v0.33.0 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
67 changes: 50 additions & 17 deletions processor/transformprocessor/internal/logs/processor.go
Original file line number Diff line number Diff line change
@@ -18,43 +18,76 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

type Processor struct {
contexts []consumer.Logs
// Deprecated. Use contexts instead
statements []*ottl.Statement[ottllogs.TransformContext]
}

func NewProcessor(statements []string, settings component.TelemetrySettings) (*Processor, error) {
ottlp := ottllogs.NewParser(Functions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottllogs.NewParser(Functions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
}
return &Processor{
statements: parsedStatements,
}, nil
}

pc, err := common.NewLogParserCollection(Functions(), settings)
if err != nil {
return nil, err
}

contexts := make([]consumer.Logs, len(contextStatements))
for i, cs := range contextStatements {
context, err := pc.ParseContextStatements(cs)
if err != nil {
return nil, err
}
contexts[i] = context
}

return &Processor{
statements: parsedStatements,
contexts: contexts,
}, nil
}

func (p *Processor) ProcessLogs(ctx context.Context, td plog.Logs) (plog.Logs, error) {
for i := 0; i < td.ResourceLogs().Len(); i++ {
rlogs := td.ResourceLogs().At(i)
for j := 0; j < rlogs.ScopeLogs().Len(); j++ {
slogs := rlogs.ScopeLogs().At(j)
logs := slogs.LogRecords()
for k := 0; k < logs.Len(); k++ {
tCtx := ottllogs.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource())
for _, statement := range p.statements {
_, _, err := statement.Execute(ctx, tCtx)
if err != nil {
return td, err
func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
if len(p.statements) > 0 {
for i := 0; i < ld.ResourceLogs().Len(); i++ {
rlogs := ld.ResourceLogs().At(i)
for j := 0; j < rlogs.ScopeLogs().Len(); j++ {
slogs := rlogs.ScopeLogs().At(j)
logs := slogs.LogRecords()
for k := 0; k < logs.Len(); k++ {
tCtx := ottllogs.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource())
for _, statement := range p.statements {
_, _, err := statement.Execute(ctx, tCtx)
if err != nil {
return ld, err
}
}
}
}
}
} else {
for _, c := range p.contexts {
err := c.ConsumeLogs(ctx, ld)
if err != nil {
return ld, err
}
}
}
return td, nil
return ld, nil
}
166 changes: 164 additions & 2 deletions processor/transformprocessor/internal/logs/processor_test.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

var (
@@ -36,7 +38,77 @@ var (
spanID = [8]byte{1, 2, 3, 4, 5, 6, 7, 8}
)

func TestProcess(t *testing.T) {
func Test_ProcessLogs_ResourceContext(t *testing.T) {
tests := []struct {
statement string
want func(td plog.Logs)
}{
{
statement: `set(attributes["test"], "pass")`,
want: func(td plog.Logs) {
td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass")
},
},
{
statement: `set(attributes["test"], "pass") where attributes["host.name"] == "wrong"`,
want: func(td plog.Logs) {
},
},
}

for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
assert.NoError(t, err)

exTd := constructLogs()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessLogs_ScopeContext(t *testing.T) {
tests := []struct {
statement string
want func(td plog.Logs)
}{
{
statement: `set(attributes["test"], "pass") where name == "scope"`,
want: func(td plog.Logs) {
td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass")
},
},
{
statement: `set(attributes["test"], "pass") where version == 2`,
want: func(td plog.Logs) {
},
},
}

for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
assert.NoError(t, err)

exTd := constructLogs()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessLogs_LogContext(t *testing.T) {
tests := []struct {
statement string
want func(td plog.Logs)
@@ -198,7 +270,96 @@ func TestProcess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor([]string{tt.statement}, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
assert.NoError(t, err)

exTd := constructLogs()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessLogs_MixContext(t *testing.T) {
tests := []struct {
name string
contextStatments []common.ContextStatements
want func(td plog.Logs)
}{
{
name: "set resource and then use",
contextStatments: []common.ContextStatements{
{
Context: "resource",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
{
Context: "log",
Statements: []string{
`set(attributes["test"], "pass") where resource.attributes["test"] == "pass"`,
},
},
},
want: func(td plog.Logs) {
td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass")
td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass")
td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass")
},
},
{
name: "set scope and then use",
contextStatments: []common.ContextStatements{
{
Context: "scope",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
{
Context: "log",
Statements: []string{
`set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`,
},
},
},
want: func(td plog.Logs) {
td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass")
td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass")
td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass")
},
},
{
name: "order matters",
contextStatments: []common.ContextStatements{
{
Context: "log",
Statements: []string{
`set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`,
},
},
{
Context: "scope",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
},
want: func(td plog.Logs) {
td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass")
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
@@ -217,6 +378,7 @@ func constructLogs() plog.Logs {
rs0 := td.ResourceLogs().AppendEmpty()
rs0.Resource().Attributes().PutStr("host.name", "localhost")
rs0ils0 := rs0.ScopeLogs().AppendEmpty()
rs0ils0.Scope().SetName("scope")
fillLogOne(rs0ils0.LogRecords().AppendEmpty())
fillLogTwo(rs0ils0.LogRecords().AppendEmpty())
return td
89 changes: 61 additions & 28 deletions processor/transformprocessor/internal/metrics/processor.go
Original file line number Diff line number Diff line change
@@ -18,56 +18,89 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoints"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

type Processor struct {
contexts []consumer.Metrics
// Deprecated. Use contexts instead
statements []*ottl.Statement[ottldatapoints.TransformContext]
}

func NewProcessor(statements []string, settings component.TelemetrySettings) (*Processor, error) {
ottlp := ottldatapoints.NewParser(Functions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottldatapoints.NewParser(Functions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
}
return &Processor{
statements: parsedStatements,
}, nil
}

pc, err := common.NewMetricParserCollection(Functions(), settings)
if err != nil {
return nil, err
}

contexts := make([]consumer.Metrics, len(contextStatements))
for i, cs := range contextStatements {
context, err := pc.ParseContextStatements(cs)
if err != nil {
return nil, err
}
contexts[i] = context
}

return &Processor{
statements: parsedStatements,
contexts: contexts,
}, nil
}

func (p *Processor) ProcessMetrics(ctx context.Context, td pmetric.Metrics) (pmetric.Metrics, error) {
for i := 0; i < td.ResourceMetrics().Len(); i++ {
rmetrics := td.ResourceMetrics().At(i)
for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ {
smetrics := rmetrics.ScopeMetrics().At(j)
metrics := smetrics.Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
var err error
switch metric.Type() {
case pmetric.MetricTypeSum:
err = p.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
case pmetric.MetricTypeGauge:
err = p.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
case pmetric.MetricTypeHistogram:
err = p.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
case pmetric.MetricTypeExponentialHistogram:
err = p.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
case pmetric.MetricTypeSummary:
err = p.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
}
if err != nil {
return td, err
func (p *Processor) ProcessMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
if len(p.statements) > 0 {
for i := 0; i < md.ResourceMetrics().Len(); i++ {
rmetrics := md.ResourceMetrics().At(i)
for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ {
smetrics := rmetrics.ScopeMetrics().At(j)
metrics := smetrics.Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
var err error
switch metric.Type() {
case pmetric.MetricTypeSum:
err = p.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
case pmetric.MetricTypeGauge:
err = p.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
case pmetric.MetricTypeHistogram:
err = p.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
case pmetric.MetricTypeExponentialHistogram:
err = p.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
case pmetric.MetricTypeSummary:
err = p.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource())
}
if err != nil {
return md, err
}
}
}
}
} else {
for _, c := range p.contexts {
err := c.ConsumeMetrics(ctx, md)
if err != nil {
return md, err
}
}
}
return td, nil
return md, nil
}

func (p *Processor) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error {
176 changes: 174 additions & 2 deletions processor/transformprocessor/internal/metrics/processor_test.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

var (
@@ -32,7 +34,77 @@ var (
TestTimeStamp = pcommon.NewTimestampFromTime(StartTime)
)

func TestProcess(t *testing.T) {
func Test_ProcessMetrics_ResourceContext(t *testing.T) {
tests := []struct {
statement string
want func(td pmetric.Metrics)
}{
{
statement: `set(attributes["test"], "pass")`,
want: func(td pmetric.Metrics) {
td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass")
},
},
{
statement: `set(attributes["test"], "pass") where attributes["host.name"] == "wrong"`,
want: func(td pmetric.Metrics) {
},
},
}

for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructMetrics()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessMetrics(context.Background(), td)
assert.NoError(t, err)

exTd := constructMetrics()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessMetrics_ScopeContext(t *testing.T) {
tests := []struct {
statement string
want func(td pmetric.Metrics)
}{
{
statement: `set(attributes["test"], "pass") where name == "scope"`,
want: func(td pmetric.Metrics) {
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass")
},
},
{
statement: `set(attributes["test"], "pass") where version == 2`,
want: func(td pmetric.Metrics) {
},
},
}

for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructMetrics()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessMetrics(context.Background(), td)
assert.NoError(t, err)

exTd := constructMetrics()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessMetrics_DataPointContext(t *testing.T) {
tests := []struct {
statements []string
want func(pmetric.Metrics)
@@ -371,7 +443,106 @@ func TestProcess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statements[0], func(t *testing.T) {
td := constructMetrics()
processor, err := NewProcessor(tt.statements, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "datapoint", Statements: tt.statements}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessMetrics(context.Background(), td)
assert.NoError(t, err)

exTd := constructMetrics()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessMetrics_MixContext(t *testing.T) {
tests := []struct {
name string
contextStatments []common.ContextStatements
want func(td pmetric.Metrics)
}{
{
name: "set resource and then use",
contextStatments: []common.ContextStatements{
{
Context: "resource",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
{
Context: "datapoint",
Statements: []string{
`set(attributes["test"], "pass") where resource.attributes["test"] == "pass"`,
},
},
},
want: func(td pmetric.Metrics) {
td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass")
},
},
{
name: "set scope and then use",
contextStatments: []common.ContextStatements{
{
Context: "scope",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
{
Context: "datapoint",
Statements: []string{
`set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`,
},
},
},
want: func(td pmetric.Metrics) {
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass")
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass")
},
},
{
name: "order matters",
contextStatments: []common.ContextStatements{
{
Context: "datapoint",
Statements: []string{
`set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`,
},
},
{
Context: "scope",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
},
want: func(td pmetric.Metrics) {
td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass")
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
td := constructMetrics()
processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessMetrics(context.Background(), td)
@@ -390,6 +561,7 @@ func constructMetrics() pmetric.Metrics {
rm0 := td.ResourceMetrics().AppendEmpty()
rm0.Resource().Attributes().PutStr("host.name", "myhost")
rm0ils0 := rm0.ScopeMetrics().AppendEmpty()
rm0ils0.Scope().SetName("scope")
fillMetricOne(rm0ils0.Metrics().AppendEmpty())
fillMetricTwo(rm0ils0.Metrics().AppendEmpty())
fillMetricThree(rm0ils0.Metrics().AppendEmpty())
63 changes: 48 additions & 15 deletions processor/transformprocessor/internal/traces/processor.go
Original file line number Diff line number Diff line change
@@ -18,43 +18,76 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottltraces"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

type Processor struct {
contexts []consumer.Traces
// Deprecated. Use contexts instead
statements []*ottl.Statement[ottltraces.TransformContext]
}

func NewProcessor(statements []string, settings component.TelemetrySettings) (*Processor, error) {
ottlp := ottltraces.NewParser(Functions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottltraces.NewParser(Functions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
}
return &Processor{
statements: parsedStatements,
}, nil
}

pc, err := common.NewTraceParserCollection(Functions(), settings)
if err != nil {
return nil, err
}

contexts := make([]consumer.Traces, len(contextStatements))
for i, cs := range contextStatements {
context, err := pc.ParseContextStatements(cs)
if err != nil {
return nil, err
}
contexts[i] = context
}

return &Processor{
statements: parsedStatements,
contexts: contexts,
}, nil
}

func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
for i := 0; i < td.ResourceSpans().Len(); i++ {
rspans := td.ResourceSpans().At(i)
for j := 0; j < rspans.ScopeSpans().Len(); j++ {
sspan := rspans.ScopeSpans().At(j)
spans := sspan.Spans()
for k := 0; k < spans.Len(); k++ {
tCtx := ottltraces.NewTransformContext(spans.At(k), sspan.Scope(), rspans.Resource())
for _, statement := range p.statements {
_, _, err := statement.Execute(ctx, tCtx)
if err != nil {
return td, err
if len(p.statements) > 0 {
for i := 0; i < td.ResourceSpans().Len(); i++ {
rspans := td.ResourceSpans().At(i)
for j := 0; j < rspans.ScopeSpans().Len(); j++ {
sspan := rspans.ScopeSpans().At(j)
spans := sspan.Spans()
for k := 0; k < spans.Len(); k++ {
tCtx := ottltraces.NewTransformContext(spans.At(k), sspan.Scope(), rspans.Resource())
for _, statement := range p.statements {
_, _, err := statement.Execute(ctx, tCtx)
if err != nil {
return td, err
}
}
}
}
}
} else {
for _, c := range p.contexts {
err := c.ConsumeTraces(ctx, td)
if err != nil {
return td, err
}
}
}
return td, nil
}
170 changes: 166 additions & 4 deletions processor/transformprocessor/internal/traces/processor_test.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

var (
@@ -37,7 +39,77 @@ var (
spanID2 = [8]byte{8, 7, 6, 5, 4, 3, 2, 1}
)

func TestProcess(t *testing.T) {
func Test_ProcessTraces_ResourceContext(t *testing.T) {
tests := []struct {
statement string
want func(td ptrace.Traces)
}{
{
statement: `set(attributes["test"], "pass")`,
want: func(td ptrace.Traces) {
td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass")
},
},
{
statement: `set(attributes["test"], "pass") where attributes["host.name"] == "wrong"`,
want: func(td ptrace.Traces) {
},
},
}

for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructTraces()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessTraces(context.Background(), td)
assert.NoError(t, err)

exTd := constructTraces()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessTraces_ScopeContext(t *testing.T) {
tests := []struct {
statement string
want func(td ptrace.Traces)
}{
{
statement: `set(attributes["test"], "pass") where name == "scope"`,
want: func(td ptrace.Traces) {
td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass")
},
},
{
statement: `set(attributes["test"], "pass") where version == 2`,
want: func(td ptrace.Traces) {
},
},
}

for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructTraces()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessTraces(context.Background(), td)
assert.NoError(t, err)

exTd := constructTraces()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessTraces_TraceContext(t *testing.T) {
tests := []struct {
statement string
want func(td ptrace.Traces)
@@ -245,7 +317,96 @@ func TestProcess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructTraces()
processor, err := NewProcessor([]string{tt.statement}, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "trace", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessTraces(context.Background(), td)
assert.NoError(t, err)

exTd := constructTraces()
tt.want(exTd)

assert.Equal(t, exTd, td)
})
}
}

func Test_ProcessTraces_MixContext(t *testing.T) {
tests := []struct {
name string
contextStatments []common.ContextStatements
want func(td ptrace.Traces)
}{
{
name: "set resource and then use",
contextStatments: []common.ContextStatements{
{
Context: "resource",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
{
Context: "trace",
Statements: []string{
`set(attributes["test"], "pass") where resource.attributes["test"] == "pass"`,
},
},
},
want: func(td ptrace.Traces) {
td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass")
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass")
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass")
},
},
{
name: "set scope and then use",
contextStatments: []common.ContextStatements{
{
Context: "scope",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
{
Context: "trace",
Statements: []string{
`set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`,
},
},
},
want: func(td ptrace.Traces) {
td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass")
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass")
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass")
},
},
{
name: "order matters",
contextStatments: []common.ContextStatements{
{
Context: "trace",
Statements: []string{
`set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`,
},
},
{
Context: "scope",
Statements: []string{
`set(attributes["test"], "pass")`,
},
},
},
want: func(td ptrace.Traces) {
td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass")
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
td := constructTraces()
processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessTraces(context.Background(), td)
@@ -295,7 +456,7 @@ func BenchmarkTwoSpans(b *testing.B) {

for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
processor, err := NewProcessor(tt.statements, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor(tt.statements, nil, componenttest.NewNopTelemetrySettings())
assert.NoError(b, err)
b.ResetTimer()
for n := 0; n < b.N; n++ {
@@ -337,7 +498,7 @@ func BenchmarkHundredSpans(b *testing.B) {
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
processor, err := NewProcessor(tt.statements, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor(tt.statements, nil, componenttest.NewNopTelemetrySettings())
assert.NoError(b, err)
b.ResetTimer()
for n := 0; n < b.N; n++ {
@@ -354,6 +515,7 @@ func constructTraces() ptrace.Traces {
rs0 := td.ResourceSpans().AppendEmpty()
rs0.Resource().Attributes().PutStr("host.name", "localhost")
rs0ils0 := rs0.ScopeSpans().AppendEmpty()
rs0ils0.Scope().SetName("scope")
fillSpanOne(rs0ils0.Spans().AppendEmpty())
fillSpanTwo(rs0ils0.Spans().AppendEmpty())
return td
95 changes: 72 additions & 23 deletions processor/transformprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,30 @@
transform:
trace_statements:
- context: trace
statements:
- set(name, "bear") where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])
- context: resource
statements:
- set(attributes["name"], "bear")
metric_statements:
- context: datapoint
statements:
- set(metric.name, "bear") where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])
- context: resource
statements:
- set(attributes["name"], "bear")
log_statements:
- context: log
statements:
- set(body, "bear") where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])
- context: resource
statements:
- set(attributes["name"], "bear")

transform/deprecated_format:
traces:
statements:
- set(name, "bear") where attributes["http.path"] == "/animal"
@@ -12,38 +38,61 @@ transform:
- set(body, "bear") where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])

transform/bad_syntax_log:
logs:
transform/using_both_formats:
trace_statements:
- context: trace
statements:
- set(name, "bear") where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])
traces:
statements:
- set(body, "bear" where attributes["http.path"] == "/animal"
- set(name, "bear") where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])

transform/bad_syntax_log:
log_statements:
- context: log
statements:
- set(body, "bear" where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])

transform/bad_syntax_metric:
metrics:
statements:
- set(name, "bear" where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])
metric_statements:
- context: datapoint
statements:
- set(name, "bear" where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])

transform/bad_syntax_trace:
traces:
statements:
- set(name, "bear" where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])
trace_statements:
- context: trace
statements:
- set(name, "bear" where attributes["http.path"] == "/animal"
- keep_keys(attributes, ["http.method", "http.path"])

transform/unknown_function_log:
logs:
statements:
- set(body, "bear") where attributes["http.path"] == "/animal"
- not_a_function(attributes, ["http.method", "http.path"])
log_statements:
- context: log
statements:
- set(body, "bear") where attributes["http.path"] == "/animal"
- not_a_function(attributes, ["http.method", "http.path"])

transform/unknown_function_metric:
metrics:
statements:
- set(metric.name, "bear") where attributes["http.path"] == "/animal"
- not_a_function(attributes, ["http.method", "http.path"])
metric_statements:
- context: datapoint
statements:
- set(metric.name, "bear") where attributes["http.path"] == "/animal"
- not_a_function(attributes, ["http.method", "http.path"])

transform/unknown_function_trace:
traces:
statements:
- set(name, "bear") where attributes["http.path"] == "/animal"
- not_a_function(attributes, ["http.method", "http.path"])
trace_statements:
- context: trace
statements:
- set(name, "bear") where attributes["http.path"] == "/animal"
- not_a_function(attributes, ["http.method", "http.path"])

transform/unknown_context:
trace_statements:
- context: test
statements:
- set(name, "bear") where attributes["http.path"] == "/animal"
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved