diff --git a/.chloggen/tp-enhanced-contexts.yaml b/.chloggen/tp-enhanced-contexts.yaml new file mode 100755 index 000000000000..9444d6a6a5f6 --- /dev/null +++ b/.chloggen/tp-enhanced-contexts.yaml @@ -0,0 +1,17 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/transform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds new configuration options that allow specifying the OTTL context to use when executing statements. See [Transform Processor README](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor#config) for more details. + +# One or more tracking issues related to the change +issues: [15381] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The existing configuration options will be deprecated in a future release. diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 32be946782aa..4b0dcbf97640 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -8,66 +8,147 @@ | Warnings | [Unsound Transformations, Identity Conflict, Orphaned Telemetry, Other](#warnings) | The transform processor modifies telemetry based on configuration using the [OpenTelemetry Transformation Language](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl). -The processor takes a list of statements for each signal type and executes the statements against the incoming telemetry in the order specified in the config. Each statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed. + +For each signal type, the processor takes a list of statements associated to a [Context type](#contexts) and executes the statements against the incoming telemetry in the order specified in the config. +Each statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed. ## Config -The transform processor allows configuring statements for traces, metrics, and logs. Each signal specifies a list of string statements that get passed to the OTTL for interpretation. +The transform processor allows configuring multiple context statements for traces, metrics, and logs. +The value of `context` specifies which [OTTL Context](#contexts) to use when interpreting the associated statements. +The statement strings, which must be OTTL compatible, will be passed to the OTTL and interpreted using the associated context. +Each context will be processed in the order specified and each statement for a context will be executed in the order specified. ```yaml transform: - : - statements: - - string - - string - - string + _statements: + - context: string + statements: + - string + - string + - string + - context: string + statements: + - string + - string + - string ``` +Proper use of contexts will provide increased performance and capabilities. See [Contexts](#contexts) for more details. + +Valid values for `context` are: + +| Signal | Context Values | +|-------------------|------------------------------------------------| +| trace_statements | `resource`, `scope`, `trace`, and `spanevent` | +| metric_statements | `resource`, `scope`, `metric`, and `datapoint` | +| log_statements | `resource`, `scope`, and `log` | + ## Example +The example takes advantage of context efficiency by grouping transformations with the context which it intends to transform. +See [Contexts](#contexts) for more details. + Example configuration: ```yaml transform: - traces: - statements: - - set(status.code, 1) where attributes["http.path"] == "/health" - - keep_keys(resource.attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"]) - - set(name, attributes["http.route"]) - - replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}") - - replace_pattern(resource.attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***") - - limit(attributes, 100, []) - - limit(resource.attributes, 100, []) + trace_statements: + - context: resource + statements: + - keep_keys(attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"]) + - replace_pattern(attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***") + - limit(attributes, 100, []) + - truncate_all(attributes, 4096) + - context: trace + statements: + - set(status.code, 1) where attributes["http.path"] == "/health" + - set(name, attributes["http.route"]) + - replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}") + - limit(attributes, 100, []) + - truncate_all(attributes, 4096) + + metric_statements: + - context: resource + statements: + - keep_keys(attributes, ["host.name"]) - truncate_all(attributes, 4096) - - truncate_all(resource.attributes, 4096) - metrics: - statements: - - set(metric.description, "Sum") where metric.type == "Sum" - - keep_keys(resource.attributes, ["host.name"]) - - limit(attributes, 100, ["host.name"]) - - truncate_all(attributes, 4096) - - truncate_all(resource.attributes, 4096) - - convert_sum_to_gauge() where metric.name == "system.processes.count" - - convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric" - logs: - statements: - - set(severity_text, "FAIL") where body == "request failed" - - replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}") - - replace_all_patterns(attributes, "/account/\\d{4}", "/account/{accountId}") - - set(body, attributes["http.route"]) - - keep_keys(resource.attributes, ["service.name", "service.namespace", "cloud.region"]) + - context: metric + statements: + - set(description, "Sum") where type == "Sum" + - context: datapoint + statements: + - limit(attributes, 100, ["host.name"]) + - truncate_all(attributes, 4096) + - convert_sum_to_gauge() where metric.name == "system.processes.count" + - convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric" + + log_statements: + - context: resource + statements: + - keep_keys(resource.attributes, ["service.name", "service.namespace", "cloud.region"]) + - context: log + statements: + - set(severity_text, "FAIL") where body == "request failed" + - replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}") + - replace_all_patterns(attributes, "/account/\\d{4}", "/account/{accountId}") + - set(body, attributes["http.route"]) ``` + ## Grammar You can learn more in-depth details on the capabilities and limitations of the OpenTelemetry Transformation Language used by the transform processor by reading about its [grammar](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl#grammar). ## Contexts -The transform processor utilizes the OTTL's standard contexts for Traces, Metrics and Logs. The contexts allow the OTTL to interact with the underlying telemetry data in its pdata form. +The transform processor utilizes the OTTL's contexts to transform Resource, Scope, Trace, SpanEvent, Metric, DataPoint, and Log telemetry. +The contexts allow the OTTL to interact with the underlying telemetry data in its pdata form. +- [Resource Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlresource) +- [Scope Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlscope) - [Traces Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottltraces) -- [Metrics Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottldatapoints) +- [SpanEvent Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspanevent) +- [Metric Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlmetric) +- [DataPoint Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottldatapoints) - [Logs Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottllogs) +Each context allows transformation of its type of telemetry. +For example, statements associated to a `resource` context will be able to transform the resource's `attributes` and `dropped_attributes_count`. + +Contexts __NEVER__ supply access to individual items "lower" in the protobuf definition. +- This means statements associated to a `resource` __WILL NOT__ be able to access the underlying instrumentation scopes. +- This means statements associated to a `scope` __WILL NOT__ be able to access the underlying telemetry slices (spans, metrics, or logs). +- Similarly, statements associated to a `metric` __WILL NOT__ be able to access individual datapoints, but can access the entire datapoints slice. +- Similarly, statements associated to a `trace` __WILL NOT__ be able to access individual SpanEvents, but can access the entire SpanEvents slice. + +For practical purposes, this means that a context cannot make decisions on its telemetry based on telemetry "lower" in the structure. +For example, __the following context statement is not possible__ because it attempts to use individual datapoint attributes in the condition of a statements that is associated to a `metric` + +```yaml +metric_statements: +- context: metric + statements: + - set(description, "test passed") where datapoints.attributes["test"] == "pass" +``` + +Context __ALWAYS__ supply access to the items "higher" in the protobuf definition that are associated to the telemetry being transformed. +- This means that statements associated to a `datapoint` have access to a datapoint's metric, instrumentation scope, and resource. +- This means that statements associated to a `spanevent` have access to a spanevent's span, instrumentation scope, and resource. +- This means that statements associated to a `trace`/`metric`/`log` have access to the telemetry's instrumentation scope, and resource. +- This means that statements associated to a `scope` have access to the scope's resource. + +For example, __the following context statement is possible__ because `datapoint` statements can access the datapoint's metric. + +```yaml +metric_statements: +- context: datapoint + statements: + - set(metric.description, "test passed") where attributes["test"] == "pass" +``` + +Whenever possible, associate your statements to the context that the statement intend to transform. +Although you can modify resource attributes associated to a span using the `trace` context, it is more efficient to use the `resource` context. +This is because contexts are nested: the efficiency comes because higher-level contexts can avoid iterating through any of the contexts at a lower level. + ## Supported functions: Since the transform processor utilizes the OTTL's contexts for Traces, Metrics, and Logs, it is able to utilize functions that expect pdata in addition to any common functions. These common functions can be used for any signal. diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index 3de35080eee2..7f970b0ec3b4 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -15,14 +15,16 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" import ( + "fmt" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" - "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoints" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllogs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottltraces" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" @@ -31,6 +33,11 @@ import ( type Config struct { config.ProcessorSettings `mapstructure:",squash"` + TraceStatements []common.ContextStatements `mapstructure:"trace_statements"` + MetricStatements []common.ContextStatements `mapstructure:"metric_statements"` + LogStatements []common.ContextStatements `mapstructure:"log_statements"` + + // Deprecated. Use TraceStatements, MetricStatements, and LogStatements instead OTTLConfig `mapstructure:",squash"` } @@ -47,24 +54,73 @@ type SignalConfig struct { var _ component.ProcessorConfig = (*Config)(nil) func (c *Config) Validate() error { - var errors error + if (len(c.Traces.Statements) > 0 || len(c.Metrics.Statements) > 0 || len(c.Logs.Statements) > 0) && + (len(c.TraceStatements) > 0 || len(c.MetricStatements) > 0 || len(c.LogStatements) > 0) { + return fmt.Errorf("cannot use Traces, Metrics and/or Logs with TraceStatements, MetricStatements and/or LogStatements") + } + + if len(c.Traces.Statements) > 0 { + ottltracesp := ottltraces.NewParser(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + _, err := ottltracesp.ParseStatements(c.Traces.Statements) + if err != nil { + return err + } + } - ottltracesp := ottltraces.NewParser(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) - _, err := ottltracesp.ParseStatements(c.Traces.Statements) - if err != nil { - errors = multierr.Append(errors, err) + if len(c.TraceStatements) > 0 { + pc, err := common.NewTraceParserCollection(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + if err != nil { + return err + } + for _, cs := range c.TraceStatements { + _, err = pc.ParseContextStatements(cs) + if err != nil { + return err + } + } } - ottlmetricsp := ottldatapoints.NewParser(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) - _, err = ottlmetricsp.ParseStatements(c.Metrics.Statements) - if err != nil { - errors = multierr.Append(errors, err) + if len(c.Metrics.Statements) > 0 { + ottlmetricsp := ottldatapoints.NewParser(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + _, err := ottlmetricsp.ParseStatements(c.Metrics.Statements) + if err != nil { + return err + } } - ottllogsp := ottllogs.NewParser(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) - _, err = ottllogsp.ParseStatements(c.Logs.Statements) - if err != nil { - errors = multierr.Append(errors, err) + if len(c.MetricStatements) > 0 { + pc, err := common.NewMetricParserCollection(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + if err != nil { + return err + } + for _, cs := range c.MetricStatements { + _, err = pc.ParseContextStatements(cs) + if err != nil { + return err + } + } } - return errors + + if len(c.Logs.Statements) > 0 { + ottllogsp := ottllogs.NewParser(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + _, err := ottllogsp.ParseStatements(c.Logs.Statements) + if err != nil { + return err + } + } + + if len(c.LogStatements) > 0 { + pc, err := common.NewLogParserCollection(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + if err != nil { + return err + } + for _, cs := range c.LogStatements { + _, err = pc.ParseContextStatements(cs) + if err != nil { + return err + } + } + } + + return nil } diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index 9c53f1dd2bf6..f334dc9a6f81 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -19,10 +19,11 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/confmap/confmaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) func TestLoadConfig(t *testing.T) { @@ -37,6 +38,68 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(typeStr, ""), expected: &Config{ ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)), + OTTLConfig: OTTLConfig{ + Traces: SignalConfig{ + Statements: []string{}, + }, + Metrics: SignalConfig{ + Statements: []string{}, + }, + Logs: SignalConfig{ + Statements: []string{}, + }, + }, + TraceStatements: []common.ContextStatements{ + { + Context: "trace", + Statements: []string{ + `set(name, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + { + Context: "resource", + Statements: []string{ + `set(attributes["name"], "bear")`, + }, + }, + }, + MetricStatements: []common.ContextStatements{ + { + Context: "datapoint", + Statements: []string{ + `set(metric.name, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + { + Context: "resource", + Statements: []string{ + `set(attributes["name"], "bear")`, + }, + }, + }, + LogStatements: []common.ContextStatements{ + { + Context: "log", + Statements: []string{ + `set(body, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + { + Context: "resource", + Statements: []string{ + `set(attributes["name"], "bear")`, + }, + }, + }, + }, + }, + { + id: component.NewIDWithName(typeStr, "deprecated_format"), + expected: &Config{ + ProcessorSettings: config.NewProcessorSettings(component.NewIDWithName(typeStr, "")), OTTLConfig: OTTLConfig{ Traces: SignalConfig{ Statements: []string{ @@ -57,8 +120,15 @@ func TestLoadConfig(t *testing.T) { }, }, }, + TraceStatements: []common.ContextStatements{}, + MetricStatements: []common.ContextStatements{}, + LogStatements: []common.ContextStatements{}, }, }, + { + id: component.NewIDWithName(typeStr, "using_both_formats"), + errorMessage: "cannot use Traces, Metrics and/or Logs with TraceStatements, MetricStatements and/or LogStatements", + }, { id: component.NewIDWithName(typeStr, "bad_syntax_trace"), errorMessage: "1:18: unexpected token \"where\" (expected \")\")", @@ -67,7 +137,6 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(typeStr, "unknown_function_trace"), errorMessage: "undefined function not_a_function", }, - { id: component.NewIDWithName(typeStr, "bad_syntax_metric"), errorMessage: "1:18: unexpected token \"where\" (expected \")\")", @@ -88,14 +157,14 @@ func TestLoadConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.id.String(), func(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) - require.NoError(t, err) + assert.NoError(t, err) factory := NewFactory() cfg := factory.CreateDefaultConfig() sub, err := cm.Sub(tt.id.String()) - require.NoError(t, err) - require.NoError(t, component.UnmarshalProcessorConfig(sub, cfg)) + assert.NoError(t, err) + assert.NoError(t, component.UnmarshalProcessorConfig(sub, cfg)) if tt.expected == nil { assert.EqualError(t, cfg.Validate(), tt.errorMessage) @@ -106,3 +175,17 @@ func TestLoadConfig(t *testing.T) { }) } } + +func Test_UnknownContextID(t *testing.T) { + id := component.NewIDWithName(typeStr, "unknown_context") + + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + assert.NoError(t, err) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(id.String()) + assert.NoError(t, err) + assert.Error(t, component.UnmarshalProcessorConfig(sub, cfg)) +} diff --git a/processor/transformprocessor/factory.go b/processor/transformprocessor/factory.go index 9eda44d6ab2b..462e83dae611 100644 --- a/processor/transformprocessor/factory.go +++ b/processor/transformprocessor/factory.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor/processorhelper" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" @@ -59,6 +60,9 @@ func createDefaultConfig() component.ProcessorConfig { Statements: []string{}, }, }, + TraceStatements: []common.ContextStatements{}, + MetricStatements: []common.ContextStatements{}, + LogStatements: []common.ContextStatements{}, } } @@ -70,7 +74,7 @@ func createLogsProcessor( ) (component.LogsProcessor, error) { oCfg := cfg.(*Config) - proc, err := logs.NewProcessor(oCfg.Logs.Statements, set.TelemetrySettings) + proc, err := logs.NewProcessor(oCfg.Logs.Statements, oCfg.LogStatements, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } @@ -91,7 +95,7 @@ func createTracesProcessor( ) (component.TracesProcessor, error) { oCfg := cfg.(*Config) - proc, err := traces.NewProcessor(oCfg.Traces.Statements, set.TelemetrySettings) + proc, err := traces.NewProcessor(oCfg.Traces.Statements, oCfg.TraceStatements, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } @@ -112,7 +116,7 @@ func createMetricsProcessor( ) (component.MetricsProcessor, error) { oCfg := cfg.(*Config) - proc, err := metrics.NewProcessor(oCfg.Metrics.Statements, set.TelemetrySettings) + proc, err := metrics.NewProcessor(oCfg.Metrics.Statements, oCfg.MetricStatements, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } diff --git a/processor/transformprocessor/factory_test.go b/processor/transformprocessor/factory_test.go index da28884a5565..7b6554be9201 100644 --- a/processor/transformprocessor/factory_test.go +++ b/processor/transformprocessor/factory_test.go @@ -26,6 +26,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) func TestFactory_Type(t *testing.T) { @@ -49,6 +51,9 @@ func TestFactory_CreateDefaultConfig(t *testing.T) { Statements: []string{}, }, }, + TraceStatements: []common.ContextStatements{}, + MetricStatements: []common.ContextStatements{}, + LogStatements: []common.ContextStatements{}, }) assert.NoError(t, componenttest.CheckConfigStruct(cfg)) } diff --git a/processor/transformprocessor/go.mod b/processor/transformprocessor/go.mod index c7b337437254..a948afc986fe 100644 --- a/processor/transformprocessor/go.mod +++ b/processor/transformprocessor/go.mod @@ -7,7 +7,6 @@ require ( github.com/stretchr/testify v1.8.1 go.opentelemetry.io/collector v0.64.0 go.opentelemetry.io/collector/pdata v0.64.0 - go.uber.org/multierr v1.8.0 go.uber.org/zap v1.23.0 ) @@ -35,6 +34,7 @@ require ( go.opentelemetry.io/otel/metric v0.33.0 // indirect go.opentelemetry.io/otel/trace v1.11.1 // indirect go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.8.0 // indirect golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index e709098fd8f2..3d578b8e484a 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -18,43 +18,76 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllogs" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) type Processor struct { + contexts []consumer.Logs + // Deprecated. Use contexts instead statements []*ottl.Statement[ottllogs.TransformContext] } -func NewProcessor(statements []string, settings component.TelemetrySettings) (*Processor, error) { - ottlp := ottllogs.NewParser(Functions(), settings) - parsedStatements, err := ottlp.ParseStatements(statements) +func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { + if len(statements) > 0 { + ottlp := ottllogs.NewParser(Functions(), settings) + parsedStatements, err := ottlp.ParseStatements(statements) + if err != nil { + return nil, err + } + return &Processor{ + statements: parsedStatements, + }, nil + } + + pc, err := common.NewLogParserCollection(Functions(), settings) if err != nil { return nil, err } + + contexts := make([]consumer.Logs, len(contextStatements)) + for i, cs := range contextStatements { + context, err := pc.ParseContextStatements(cs) + if err != nil { + return nil, err + } + contexts[i] = context + } + return &Processor{ - statements: parsedStatements, + contexts: contexts, }, nil } -func (p *Processor) ProcessLogs(ctx context.Context, td plog.Logs) (plog.Logs, error) { - for i := 0; i < td.ResourceLogs().Len(); i++ { - rlogs := td.ResourceLogs().At(i) - for j := 0; j < rlogs.ScopeLogs().Len(); j++ { - slogs := rlogs.ScopeLogs().At(j) - logs := slogs.LogRecords() - for k := 0; k < logs.Len(); k++ { - tCtx := ottllogs.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource()) - for _, statement := range p.statements { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return td, err +func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { + if len(p.statements) > 0 { + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rlogs := ld.ResourceLogs().At(i) + for j := 0; j < rlogs.ScopeLogs().Len(); j++ { + slogs := rlogs.ScopeLogs().At(j) + logs := slogs.LogRecords() + for k := 0; k < logs.Len(); k++ { + tCtx := ottllogs.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource()) + for _, statement := range p.statements { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return ld, err + } } } } } + } else { + for _, c := range p.contexts { + err := c.ConsumeLogs(ctx, ld) + if err != nil { + return ld, err + } + } } - return td, nil + return ld, nil } diff --git a/processor/transformprocessor/internal/logs/processor_test.go b/processor/transformprocessor/internal/logs/processor_test.go index f72ab696a7e7..3a1e87754b21 100644 --- a/processor/transformprocessor/internal/logs/processor_test.go +++ b/processor/transformprocessor/internal/logs/processor_test.go @@ -23,6 +23,8 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) var ( @@ -36,7 +38,77 @@ var ( spanID = [8]byte{1, 2, 3, 4, 5, 6, 7, 8} ) -func TestProcess(t *testing.T) { +func Test_ProcessLogs_ResourceContext(t *testing.T) { + tests := []struct { + statement string + want func(td plog.Logs) + }{ + { + statement: `set(attributes["test"], "pass")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(attributes["test"], "pass") where attributes["host.name"] == "wrong"`, + want: func(td plog.Logs) { + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.NoError(t, err) + + exTd := constructLogs() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessLogs_ScopeContext(t *testing.T) { + tests := []struct { + statement string + want func(td plog.Logs) + }{ + { + statement: `set(attributes["test"], "pass") where name == "scope"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(attributes["test"], "pass") where version == 2`, + want: func(td plog.Logs) { + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.NoError(t, err) + + exTd := constructLogs() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessLogs_LogContext(t *testing.T) { tests := []struct { statement string want func(td plog.Logs) @@ -198,7 +270,124 @@ func TestProcess(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor([]string{tt.statement}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.NoError(t, err) + + exTd := constructLogs() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessLogs_MixContext(t *testing.T) { + tests := []struct { + name string + contextStatments []common.ContextStatements + want func(td plog.Logs) + }{ + { + name: "set resource and then use", + contextStatments: []common.ContextStatements{ + { + Context: "resource", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "log", + Statements: []string{ + `set(attributes["test"], "pass") where resource.attributes["test"] == "pass"`, + }, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "set scope and then use", + contextStatments: []common.ContextStatements{ + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "log", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "order matters", + contextStatments: []common.ContextStatements{ + { + Context: "log", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "reuse context", + contextStatments: []common.ContextStatements{ + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "log", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "fail")`, + }, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "fail") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -217,6 +406,7 @@ func constructLogs() plog.Logs { rs0 := td.ResourceLogs().AppendEmpty() rs0.Resource().Attributes().PutStr("host.name", "localhost") rs0ils0 := rs0.ScopeLogs().AppendEmpty() + rs0ils0.Scope().SetName("scope") fillLogOne(rs0ils0.LogRecords().AppendEmpty()) fillLogTwo(rs0ils0.LogRecords().AppendEmpty()) return td diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index 66a05c239048..b8e4ae69e554 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -18,56 +18,89 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoints" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) type Processor struct { + contexts []consumer.Metrics + // Deprecated. Use contexts instead statements []*ottl.Statement[ottldatapoints.TransformContext] } -func NewProcessor(statements []string, settings component.TelemetrySettings) (*Processor, error) { - ottlp := ottldatapoints.NewParser(Functions(), settings) - parsedStatements, err := ottlp.ParseStatements(statements) +func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { + if len(statements) > 0 { + ottlp := ottldatapoints.NewParser(Functions(), settings) + parsedStatements, err := ottlp.ParseStatements(statements) + if err != nil { + return nil, err + } + return &Processor{ + statements: parsedStatements, + }, nil + } + + pc, err := common.NewMetricParserCollection(Functions(), settings) if err != nil { return nil, err } + + contexts := make([]consumer.Metrics, len(contextStatements)) + for i, cs := range contextStatements { + context, err := pc.ParseContextStatements(cs) + if err != nil { + return nil, err + } + contexts[i] = context + } + return &Processor{ - statements: parsedStatements, + contexts: contexts, }, nil } -func (p *Processor) ProcessMetrics(ctx context.Context, td pmetric.Metrics) (pmetric.Metrics, error) { - for i := 0; i < td.ResourceMetrics().Len(); i++ { - rmetrics := td.ResourceMetrics().At(i) - for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { - smetrics := rmetrics.ScopeMetrics().At(j) - metrics := smetrics.Metrics() - for k := 0; k < metrics.Len(); k++ { - metric := metrics.At(k) - var err error - switch metric.Type() { - case pmetric.MetricTypeSum: - err = p.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - case pmetric.MetricTypeGauge: - err = p.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - case pmetric.MetricTypeHistogram: - err = p.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - case pmetric.MetricTypeExponentialHistogram: - err = p.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - case pmetric.MetricTypeSummary: - err = p.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - } - if err != nil { - return td, err +func (p *Processor) ProcessMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + if len(p.statements) > 0 { + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rmetrics := md.ResourceMetrics().At(i) + for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { + smetrics := rmetrics.ScopeMetrics().At(j) + metrics := smetrics.Metrics() + for k := 0; k < metrics.Len(); k++ { + metric := metrics.At(k) + var err error + switch metric.Type() { + case pmetric.MetricTypeSum: + err = p.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + case pmetric.MetricTypeGauge: + err = p.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + case pmetric.MetricTypeHistogram: + err = p.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + case pmetric.MetricTypeExponentialHistogram: + err = p.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + case pmetric.MetricTypeSummary: + err = p.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + } + if err != nil { + return md, err + } } } } + } else { + for _, c := range p.contexts { + err := c.ConsumeMetrics(ctx, md) + if err != nil { + return md, err + } + } } - return td, nil + return md, nil } func (p *Processor) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index eab03406f7ae..5bd6119fdb66 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -23,6 +23,8 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) var ( @@ -32,7 +34,77 @@ var ( TestTimeStamp = pcommon.NewTimestampFromTime(StartTime) ) -func TestProcess(t *testing.T) { +func Test_ProcessMetrics_ResourceContext(t *testing.T) { + tests := []struct { + statement string + want func(td pmetric.Metrics) + }{ + { + statement: `set(attributes["test"], "pass")`, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(attributes["test"], "pass") where attributes["host.name"] == "wrong"`, + want: func(td pmetric.Metrics) { + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructMetrics() + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := constructMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessMetrics_ScopeContext(t *testing.T) { + tests := []struct { + statement string + want func(td pmetric.Metrics) + }{ + { + statement: `set(attributes["test"], "pass") where name == "scope"`, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(attributes["test"], "pass") where version == 2`, + want: func(td pmetric.Metrics) { + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructMetrics() + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := constructMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessMetrics_DataPointContext(t *testing.T) { tests := []struct { statements []string want func(pmetric.Metrics) @@ -371,7 +443,139 @@ func TestProcess(t *testing.T) { for _, tt := range tests { t.Run(tt.statements[0], func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor(tt.statements, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "datapoint", Statements: tt.statements}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := constructMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessMetrics_MixContext(t *testing.T) { + tests := []struct { + name string + contextStatments []common.ContextStatements + want func(td pmetric.Metrics) + }{ + { + name: "set resource and then use", + contextStatments: []common.ContextStatements{ + { + Context: "resource", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "datapoint", + Statements: []string{ + `set(attributes["test"], "pass") where resource.attributes["test"] == "pass"`, + }, + }, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "set scope and then use", + contextStatments: []common.ContextStatements{ + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "datapoint", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "order matters", + contextStatments: []common.ContextStatements{ + { + Context: "datapoint", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "reuse context ", + contextStatments: []common.ContextStatements{ + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "datapoint", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "fail")`, + }, + }, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "fail") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructMetrics() + processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) @@ -390,6 +594,7 @@ func constructMetrics() pmetric.Metrics { rm0 := td.ResourceMetrics().AppendEmpty() rm0.Resource().Attributes().PutStr("host.name", "myhost") rm0ils0 := rm0.ScopeMetrics().AppendEmpty() + rm0ils0.Scope().SetName("scope") fillMetricOne(rm0ils0.Metrics().AppendEmpty()) fillMetricTwo(rm0ils0.Metrics().AppendEmpty()) fillMetricThree(rm0ils0.Metrics().AppendEmpty()) diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index 1134b2fea2d0..ac6af5cc1403 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -18,43 +18,76 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottltraces" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) type Processor struct { + contexts []consumer.Traces + // Deprecated. Use contexts instead statements []*ottl.Statement[ottltraces.TransformContext] } -func NewProcessor(statements []string, settings component.TelemetrySettings) (*Processor, error) { - ottlp := ottltraces.NewParser(Functions(), settings) - parsedStatements, err := ottlp.ParseStatements(statements) +func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { + if len(statements) > 0 { + ottlp := ottltraces.NewParser(Functions(), settings) + parsedStatements, err := ottlp.ParseStatements(statements) + if err != nil { + return nil, err + } + return &Processor{ + statements: parsedStatements, + }, nil + } + + pc, err := common.NewTraceParserCollection(Functions(), settings) if err != nil { return nil, err } + + contexts := make([]consumer.Traces, len(contextStatements)) + for i, cs := range contextStatements { + context, err := pc.ParseContextStatements(cs) + if err != nil { + return nil, err + } + contexts[i] = context + } + return &Processor{ - statements: parsedStatements, + contexts: contexts, }, nil } func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { - for i := 0; i < td.ResourceSpans().Len(); i++ { - rspans := td.ResourceSpans().At(i) - for j := 0; j < rspans.ScopeSpans().Len(); j++ { - sspan := rspans.ScopeSpans().At(j) - spans := sspan.Spans() - for k := 0; k < spans.Len(); k++ { - tCtx := ottltraces.NewTransformContext(spans.At(k), sspan.Scope(), rspans.Resource()) - for _, statement := range p.statements { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return td, err + if len(p.statements) > 0 { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rspans := td.ResourceSpans().At(i) + for j := 0; j < rspans.ScopeSpans().Len(); j++ { + sspan := rspans.ScopeSpans().At(j) + spans := sspan.Spans() + for k := 0; k < spans.Len(); k++ { + tCtx := ottltraces.NewTransformContext(spans.At(k), sspan.Scope(), rspans.Resource()) + for _, statement := range p.statements { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return td, err + } } } } } + } else { + for _, c := range p.contexts { + err := c.ConsumeTraces(ctx, td) + if err != nil { + return td, err + } + } } return td, nil } diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index c6d6b08922e9..32e502787068 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -23,6 +23,8 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) var ( @@ -37,7 +39,77 @@ var ( spanID2 = [8]byte{8, 7, 6, 5, 4, 3, 2, 1} ) -func TestProcess(t *testing.T) { +func Test_ProcessTraces_ResourceContext(t *testing.T) { + tests := []struct { + statement string + want func(td ptrace.Traces) + }{ + { + statement: `set(attributes["test"], "pass")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(attributes["test"], "pass") where attributes["host.name"] == "wrong"`, + want: func(td ptrace.Traces) { + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessTraces_ScopeContext(t *testing.T) { + tests := []struct { + statement string + want func(td ptrace.Traces) + }{ + { + statement: `set(attributes["test"], "pass") where name == "scope"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(attributes["test"], "pass") where version == 2`, + want: func(td ptrace.Traces) { + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessTraces_TraceContext(t *testing.T) { tests := []struct { statement string want func(td ptrace.Traces) @@ -245,7 +317,124 @@ func TestProcess(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor([]string{tt.statement}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "trace", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessTraces_MixContext(t *testing.T) { + tests := []struct { + name string + contextStatments []common.ContextStatements + want func(td ptrace.Traces) + }{ + { + name: "set resource and then use", + contextStatments: []common.ContextStatements{ + { + Context: "resource", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "trace", + Statements: []string{ + `set(attributes["test"], "pass") where resource.attributes["test"] == "pass"`, + }, + }, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "set scope and then use", + contextStatments: []common.ContextStatements{ + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "trace", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "order matters", + contextStatments: []common.ContextStatements{ + { + Context: "trace", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "reuse context", + contextStatments: []common.ContextStatements{ + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "trace", + Statements: []string{ + `set(attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`, + }, + }, + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "fail")`, + }, + }, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "fail") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -295,7 +484,7 @@ func BenchmarkTwoSpans(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - processor, err := NewProcessor(tt.statements, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.statements, nil, componenttest.NewNopTelemetrySettings()) assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -337,7 +526,7 @@ func BenchmarkHundredSpans(b *testing.B) { } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - processor, err := NewProcessor(tt.statements, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.statements, nil, componenttest.NewNopTelemetrySettings()) assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -354,6 +543,7 @@ func constructTraces() ptrace.Traces { rs0 := td.ResourceSpans().AppendEmpty() rs0.Resource().Attributes().PutStr("host.name", "localhost") rs0ils0 := rs0.ScopeSpans().AppendEmpty() + rs0ils0.Scope().SetName("scope") fillSpanOne(rs0ils0.Spans().AppendEmpty()) fillSpanTwo(rs0ils0.Spans().AppendEmpty()) return td diff --git a/processor/transformprocessor/testdata/config.yaml b/processor/transformprocessor/testdata/config.yaml index f7d68f80a070..ba38bcf0a157 100644 --- a/processor/transformprocessor/testdata/config.yaml +++ b/processor/transformprocessor/testdata/config.yaml @@ -1,4 +1,30 @@ transform: + trace_statements: + - context: trace + statements: + - set(name, "bear") where attributes["http.path"] == "/animal" + - keep_keys(attributes, ["http.method", "http.path"]) + - context: resource + statements: + - set(attributes["name"], "bear") + metric_statements: + - context: datapoint + statements: + - set(metric.name, "bear") where attributes["http.path"] == "/animal" + - keep_keys(attributes, ["http.method", "http.path"]) + - context: resource + statements: + - set(attributes["name"], "bear") + log_statements: + - context: log + statements: + - set(body, "bear") where attributes["http.path"] == "/animal" + - keep_keys(attributes, ["http.method", "http.path"]) + - context: resource + statements: + - set(attributes["name"], "bear") + +transform/deprecated_format: traces: statements: - set(name, "bear") where attributes["http.path"] == "/animal" @@ -12,38 +38,61 @@ transform: - set(body, "bear") where attributes["http.path"] == "/animal" - keep_keys(attributes, ["http.method", "http.path"]) -transform/bad_syntax_log: - logs: +transform/using_both_formats: + trace_statements: + - context: trace + statements: + - set(name, "bear") where attributes["http.path"] == "/animal" + - keep_keys(attributes, ["http.method", "http.path"]) + traces: statements: - - set(body, "bear" where attributes["http.path"] == "/animal" + - set(name, "bear") where attributes["http.path"] == "/animal" - keep_keys(attributes, ["http.method", "http.path"]) +transform/bad_syntax_log: + log_statements: + - context: log + statements: + - set(body, "bear" where attributes["http.path"] == "/animal" + - keep_keys(attributes, ["http.method", "http.path"]) + transform/bad_syntax_metric: - metrics: - statements: - - set(name, "bear" where attributes["http.path"] == "/animal" - - keep_keys(attributes, ["http.method", "http.path"]) + metric_statements: + - context: datapoint + statements: + - set(name, "bear" where attributes["http.path"] == "/animal" + - keep_keys(attributes, ["http.method", "http.path"]) transform/bad_syntax_trace: - traces: - statements: - - set(name, "bear" where attributes["http.path"] == "/animal" - - keep_keys(attributes, ["http.method", "http.path"]) + trace_statements: + - context: trace + statements: + - set(name, "bear" where attributes["http.path"] == "/animal" + - keep_keys(attributes, ["http.method", "http.path"]) transform/unknown_function_log: - logs: - statements: - - set(body, "bear") where attributes["http.path"] == "/animal" - - not_a_function(attributes, ["http.method", "http.path"]) + log_statements: + - context: log + statements: + - set(body, "bear") where attributes["http.path"] == "/animal" + - not_a_function(attributes, ["http.method", "http.path"]) transform/unknown_function_metric: - metrics: - statements: - - set(metric.name, "bear") where attributes["http.path"] == "/animal" - - not_a_function(attributes, ["http.method", "http.path"]) + metric_statements: + - context: datapoint + statements: + - set(metric.name, "bear") where attributes["http.path"] == "/animal" + - not_a_function(attributes, ["http.method", "http.path"]) transform/unknown_function_trace: - traces: - statements: - - set(name, "bear") where attributes["http.path"] == "/animal" - - not_a_function(attributes, ["http.method", "http.path"]) + trace_statements: + - context: trace + statements: + - set(name, "bear") where attributes["http.path"] == "/animal" + - not_a_function(attributes, ["http.method", "http.path"]) + +transform/unknown_context: + trace_statements: + - context: test + statements: + - set(name, "bear") where attributes["http.path"] == "/animal"