diff --git a/.chloggen/tp-remove-old-config.yaml b/.chloggen/tp-remove-old-config.yaml new file mode 100755 index 000000000000..02eaec6d0c45 --- /dev/null +++ b/.chloggen/tp-remove-old-config.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove deprecated config options. Use `[trace|metric|log]_statements` instead. + +# One or more tracking issues related to the change +issues: [16773] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index e7540f2d0f7b..a2c91ac18633 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -15,14 +15,9 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" import ( - "fmt" - "go.opentelemetry.io/collector/component" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" @@ -33,9 +28,6 @@ type Config struct { TraceStatements []common.ContextStatements `mapstructure:"trace_statements"` MetricStatements []common.ContextStatements `mapstructure:"metric_statements"` LogStatements []common.ContextStatements `mapstructure:"log_statements"` - - // Deprecated. Use TraceStatements, MetricStatements, and LogStatements instead - OTTLConfig `mapstructure:",squash"` } type OTTLConfig struct { @@ -51,19 +43,6 @@ type SignalConfig struct { var _ component.Config = (*Config)(nil) func (c *Config) Validate() error { - if (len(c.Traces.Statements) > 0 || len(c.Metrics.Statements) > 0 || len(c.Logs.Statements) > 0) && - (len(c.TraceStatements) > 0 || len(c.MetricStatements) > 0 || len(c.LogStatements) > 0) { - return fmt.Errorf("cannot use Traces, Metrics and/or Logs with TraceStatements, MetricStatements and/or LogStatements") - } - - if len(c.Traces.Statements) > 0 { - ottlspanp := ottlspan.NewParser(traces.SpanFunctions(), component.TelemetrySettings{Logger: zap.NewNop()}) - _, err := ottlspanp.ParseStatements(c.Traces.Statements) - if err != nil { - return err - } - } - if len(c.TraceStatements) > 0 { pc, err := common.NewTraceParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithSpanParser(traces.SpanFunctions()), common.WithSpanEventParser(traces.SpanEventFunctions())) if err != nil { @@ -77,14 +56,6 @@ func (c *Config) Validate() error { } } - if len(c.Metrics.Statements) > 0 { - ottlmetricsp := ottldatapoint.NewParser(metrics.DataPointFunctions(), component.TelemetrySettings{Logger: zap.NewNop()}) - _, err := ottlmetricsp.ParseStatements(c.Metrics.Statements) - if err != nil { - return err - } - } - if len(c.MetricStatements) > 0 { pc, err := common.NewMetricParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithMetricParser(metrics.MetricFunctions()), common.WithDataPointParser(metrics.DataPointFunctions())) if err != nil { @@ -98,14 +69,6 @@ func (c *Config) Validate() error { } } - if len(c.Logs.Statements) > 0 { - ottllogsp := ottllog.NewParser(logs.LogFunctions(), component.TelemetrySettings{Logger: zap.NewNop()}) - _, err := ottllogsp.ParseStatements(c.Logs.Statements) - if err != nil { - return err - } - } - if len(c.LogStatements) > 0 { pc, err := common.NewLogParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithLogParser(logs.LogFunctions())) if err != nil { diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index 6b35522e7c01..d0ec9748cf7f 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -36,17 +36,6 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(typeStr, ""), expected: &Config{ - OTTLConfig: OTTLConfig{ - Traces: SignalConfig{ - Statements: []string{}, - }, - Metrics: SignalConfig{ - Statements: []string{}, - }, - Logs: SignalConfig{ - Statements: []string{}, - }, - }, TraceStatements: []common.ContextStatements{ { Context: "span", @@ -94,38 +83,6 @@ func TestLoadConfig(t *testing.T) { }, }, }, - { - id: component.NewIDWithName(typeStr, "deprecated_format"), - expected: &Config{ - OTTLConfig: OTTLConfig{ - Traces: SignalConfig{ - Statements: []string{ - `set(name, "bear") where attributes["http.path"] == "/animal"`, - `keep_keys(attributes, ["http.method", "http.path"])`, - }, - }, - Metrics: SignalConfig{ - Statements: []string{ - `set(metric.name, "bear") where attributes["http.path"] == "/animal"`, - `keep_keys(attributes, ["http.method", "http.path"])`, - }, - }, - Logs: SignalConfig{ - Statements: []string{ - `set(body, "bear") where attributes["http.path"] == "/animal"`, - `keep_keys(attributes, ["http.method", "http.path"])`, - }, - }, - }, - TraceStatements: []common.ContextStatements{}, - MetricStatements: []common.ContextStatements{}, - LogStatements: []common.ContextStatements{}, - }, - }, - { - id: component.NewIDWithName(typeStr, "using_both_formats"), - errorMessage: "cannot use Traces, Metrics and/or Logs with TraceStatements, MetricStatements and/or LogStatements", - }, { id: component.NewIDWithName(typeStr, "bad_syntax_trace"), errorMessage: "unable to parse OTTL statement: 1:18: unexpected token \"where\" (expected \")\")", diff --git a/processor/transformprocessor/factory.go b/processor/transformprocessor/factory.go index 550a955b6f4d..67a0fcc63aad 100644 --- a/processor/transformprocessor/factory.go +++ b/processor/transformprocessor/factory.go @@ -48,17 +48,6 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - OTTLConfig: OTTLConfig{ - Logs: SignalConfig{ - Statements: []string{}, - }, - Traces: SignalConfig{ - Statements: []string{}, - }, - Metrics: SignalConfig{ - Statements: []string{}, - }, - }, TraceStatements: []common.ContextStatements{}, MetricStatements: []common.ContextStatements{}, LogStatements: []common.ContextStatements{}, @@ -73,7 +62,7 @@ func createLogsProcessor( ) (processor.Logs, error) { oCfg := cfg.(*Config) - proc, err := logs.NewProcessor(oCfg.Logs.Statements, oCfg.LogStatements, set.TelemetrySettings) + proc, err := logs.NewProcessor(oCfg.LogStatements, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } @@ -94,7 +83,7 @@ func createTracesProcessor( ) (processor.Traces, error) { oCfg := cfg.(*Config) - proc, err := traces.NewProcessor(oCfg.Traces.Statements, oCfg.TraceStatements, set.TelemetrySettings) + proc, err := traces.NewProcessor(oCfg.TraceStatements, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } @@ -115,7 +104,7 @@ func createMetricsProcessor( ) (processor.Metrics, error) { oCfg := cfg.(*Config) - proc, err := metrics.NewProcessor(oCfg.Metrics.Statements, oCfg.MetricStatements, set.TelemetrySettings) + proc, err := metrics.NewProcessor(oCfg.MetricStatements, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } diff --git a/processor/transformprocessor/factory_test.go b/processor/transformprocessor/factory_test.go index 0612e0cb68e3..54f687100b61 100644 --- a/processor/transformprocessor/factory_test.go +++ b/processor/transformprocessor/factory_test.go @@ -39,17 +39,6 @@ func TestFactory_CreateDefaultConfig(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() assert.Equal(t, cfg, &Config{ - OTTLConfig: OTTLConfig{ - Traces: SignalConfig{ - Statements: []string{}, - }, - Metrics: SignalConfig{ - Statements: []string{}, - }, - Logs: SignalConfig{ - Statements: []string{}, - }, - }, TraceStatements: []common.ContextStatements{}, MetricStatements: []common.ContextStatements{}, LogStatements: []common.ContextStatements{}, @@ -68,7 +57,12 @@ func TestFactoryCreateTracesProcessor_InvalidActions(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) - oCfg.Traces.Statements = []string{`set(123`} + oCfg.TraceStatements = []common.ContextStatements{ + { + Context: "span", + Statements: []string{`set(123`}, + }, + } ap, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.Error(t, err) assert.Nil(t, ap) @@ -78,8 +72,12 @@ func TestFactoryCreateTracesProcessor(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) - oCfg.Traces.Statements = []string{`set(attributes["test"], "pass") where name == "operationA"`} - + oCfg.TraceStatements = []common.ContextStatements{ + { + Context: "span", + Statements: []string{`set(attributes["test"], "pass") where name == "operationA"`}, + }, + } tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.NotNil(t, tp) assert.NoError(t, err) @@ -103,7 +101,12 @@ func TestFactoryCreateMetricsProcessor_InvalidActions(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) - oCfg.Metrics.Statements = []string{`set(123`} + oCfg.MetricStatements = []common.ContextStatements{ + { + Context: "datapoint", + Statements: []string{`set(123`}, + }, + } ap, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.Error(t, err) assert.Nil(t, ap) @@ -113,8 +116,12 @@ func TestFactoryCreateMetricsProcessor(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) - oCfg.Metrics.Statements = []string{`set(attributes["test"], "pass") where metric.name == "operationA"`} - + oCfg.MetricStatements = []common.ContextStatements{ + { + Context: "datapoint", + Statements: []string{`set(attributes["test"], "pass") where metric.name == "operationA"`}, + }, + } metricsProcessor, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.NotNil(t, metricsProcessor) assert.NoError(t, err) @@ -138,8 +145,12 @@ func TestFactoryCreateLogsProcessor(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) - oCfg.Logs.Statements = []string{`set(attributes["test"], "pass") where body == "operationA"`} - + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "log", + Statements: []string{`set(attributes["test"], "pass") where body == "operationA"`}, + }, + } lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.NotNil(t, lp) assert.NoError(t, err) @@ -163,7 +174,12 @@ func TestFactoryCreateLogsProcessor_InvalidActions(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) - oCfg.Logs.Statements = []string{`set(123`} + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "log", + Statements: []string{`set(123`}, + }, + } ap, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.Error(t, err) assert.Nil(t, ap) diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index f89939c8ca26..aac49f116718 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -21,29 +21,14 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) type Processor struct { contexts []consumer.Logs - // Deprecated. Use contexts instead - statements []*ottl.Statement[ottllog.TransformContext] } -func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { - if len(statements) > 0 { - ottlp := ottllog.NewParser(LogFunctions(), settings) - parsedStatements, err := ottlp.ParseStatements(statements) - if err != nil { - return nil, err - } - return &Processor{ - statements: parsedStatements, - }, nil - } - +func NewProcessor(contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { pc, err := common.NewLogParserCollection(settings, common.WithLogParser(LogFunctions())) if err != nil { return nil, err @@ -64,29 +49,10 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme } func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { - if len(p.statements) > 0 { - for i := 0; i < ld.ResourceLogs().Len(); i++ { - rlogs := ld.ResourceLogs().At(i) - for j := 0; j < rlogs.ScopeLogs().Len(); j++ { - slogs := rlogs.ScopeLogs().At(j) - logs := slogs.LogRecords() - for k := 0; k < logs.Len(); k++ { - tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource()) - for _, statement := range p.statements { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return ld, err - } - } - } - } - } - } else { - for _, c := range p.contexts { - err := c.ConsumeLogs(ctx, ld) - if err != nil { - return ld, err - } + for _, c := range p.contexts { + err := c.ConsumeLogs(ctx, ld) + if err != nil { + return ld, err } } return ld, nil diff --git a/processor/transformprocessor/internal/logs/processor_test.go b/processor/transformprocessor/internal/logs/processor_test.go index b17a8da0fcd1..f15b320320a3 100644 --- a/processor/transformprocessor/internal/logs/processor_test.go +++ b/processor/transformprocessor/internal/logs/processor_test.go @@ -59,7 +59,7 @@ func Test_ProcessLogs_ResourceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -94,7 +94,7 @@ func Test_ProcessLogs_ScopeContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -309,7 +309,7 @@ func Test_ProcessLogs_LogContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -426,7 +426,7 @@ func Test_ProcessLogs_MixContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.contextStatments, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index f84bdbfaca19..835aaafdb8b8 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -19,32 +19,16 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) type Processor struct { contexts []consumer.Metrics - // Deprecated. Use contexts instead - statements []*ottl.Statement[ottldatapoint.TransformContext] } -func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { - if len(statements) > 0 { - ottlp := ottldatapoint.NewParser(DataPointFunctions(), settings) - parsedStatements, err := ottlp.ParseStatements(statements) - if err != nil { - return nil, err - } - return &Processor{ - statements: parsedStatements, - }, nil - } - +func NewProcessor(contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { pc, err := common.NewMetricParserCollection(settings, common.WithMetricParser(MetricFunctions()), common.WithDataPointParser(DataPointFunctions())) if err != nil { return nil, err @@ -65,94 +49,11 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme } func (p *Processor) ProcessMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { - if len(p.statements) > 0 { - for i := 0; i < md.ResourceMetrics().Len(); i++ { - rmetrics := md.ResourceMetrics().At(i) - for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { - smetrics := rmetrics.ScopeMetrics().At(j) - metrics := smetrics.Metrics() - for k := 0; k < metrics.Len(); k++ { - metric := metrics.At(k) - var err error - switch metric.Type() { - case pmetric.MetricTypeSum: - err = p.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - case pmetric.MetricTypeGauge: - err = p.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - case pmetric.MetricTypeHistogram: - err = p.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - case pmetric.MetricTypeExponentialHistogram: - err = p.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - case pmetric.MetricTypeSummary: - err = p.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - } - if err != nil { - return md, err - } - } - } - } - } else { - for _, c := range p.contexts { - err := c.ConsumeMetrics(ctx, md) - if err != nil { - return md, err - } - } - } - return md, nil -} - -func (p *Processor) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { - for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := p.callFunctions(ctx, tCtx) - if err != nil { - return err - } - } - return nil -} - -func (p *Processor) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { - for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := p.callFunctions(ctx, tCtx) + for _, c := range p.contexts { + err := c.ConsumeMetrics(ctx, md) if err != nil { - return err + return md, err } } - return nil -} - -func (p *Processor) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { - for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := p.callFunctions(ctx, tCtx) - if err != nil { - return err - } - } - return nil -} - -func (p *Processor) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { - for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := p.callFunctions(ctx, tCtx) - if err != nil { - return err - } - } - return nil -} - -func (p *Processor) callFunctions(ctx context.Context, tCtx ottldatapoint.TransformContext) error { - for _, statement := range p.statements { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } - } - return nil + return md, nil } diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 61f059e38c00..91c76a7a3377 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -55,7 +55,7 @@ func Test_ProcessMetrics_ResourceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) @@ -90,7 +90,7 @@ func Test_ProcessMetrics_ScopeContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) @@ -481,7 +481,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statements[0], func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "datapoint", Statements: tt.statements}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "datapoint", Statements: tt.statements}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) @@ -613,7 +613,7 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.contextStatments, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index 525ae4f20a59..11f25dc73026 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -21,29 +21,14 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) type Processor struct { contexts []consumer.Traces - // Deprecated. Use contexts instead - statements []*ottl.Statement[ottlspan.TransformContext] } -func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { - if len(statements) > 0 { - ottlp := ottlspan.NewParser(SpanFunctions(), settings) - parsedStatements, err := ottlp.ParseStatements(statements) - if err != nil { - return nil, err - } - return &Processor{ - statements: parsedStatements, - }, nil - } - +func NewProcessor(contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { pc, err := common.NewTraceParserCollection(settings, common.WithSpanParser(SpanFunctions()), common.WithSpanEventParser(SpanEventFunctions())) if err != nil { return nil, err @@ -64,29 +49,10 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme } func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { - if len(p.statements) > 0 { - for i := 0; i < td.ResourceSpans().Len(); i++ { - rspans := td.ResourceSpans().At(i) - for j := 0; j < rspans.ScopeSpans().Len(); j++ { - sspan := rspans.ScopeSpans().At(j) - spans := sspan.Spans() - for k := 0; k < spans.Len(); k++ { - tCtx := ottlspan.NewTransformContext(spans.At(k), sspan.Scope(), rspans.Resource()) - for _, statement := range p.statements { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return td, err - } - } - } - } - } - } else { - for _, c := range p.contexts { - err := c.ConsumeTraces(ctx, td) - if err != nil { - return td, err - } + for _, c := range p.contexts { + err := c.ConsumeTraces(ctx, td) + if err != nil { + return td, err } } return td, nil diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index b31c84269832..a42997f2784a 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -60,7 +60,7 @@ func Test_ProcessTraces_ResourceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -95,7 +95,7 @@ func Test_ProcessTraces_ScopeContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -356,7 +356,7 @@ func Test_ProcessTraces_TraceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "span", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -386,7 +386,7 @@ func Test_ProcessTraces_SpanEventContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "spanevent", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "spanevent", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -503,7 +503,7 @@ func Test_ProcessTraces_MixContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.contextStatments, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -553,7 +553,7 @@ func BenchmarkTwoSpans(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - processor, err := NewProcessor(tt.statements, nil, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: tt.statements}}, componenttest.NewNopTelemetrySettings()) assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -595,7 +595,7 @@ func BenchmarkHundredSpans(b *testing.B) { } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - processor, err := NewProcessor(tt.statements, nil, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: tt.statements}}, componenttest.NewNopTelemetrySettings()) assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { diff --git a/processor/transformprocessor/testdata/config.yaml b/processor/transformprocessor/testdata/config.yaml index 4432c39877f8..43e54612cb2b 100644 --- a/processor/transformprocessor/testdata/config.yaml +++ b/processor/transformprocessor/testdata/config.yaml @@ -24,31 +24,6 @@ transform: statements: - set(attributes["name"], "bear") -transform/deprecated_format: - traces: - statements: - - set(name, "bear") where attributes["http.path"] == "/animal" - - keep_keys(attributes, ["http.method", "http.path"]) - metrics: - statements: - - set(metric.name, "bear") where attributes["http.path"] == "/animal" - - keep_keys(attributes, ["http.method", "http.path"]) - logs: - statements: - - set(body, "bear") where attributes["http.path"] == "/animal" - - keep_keys(attributes, ["http.method", "http.path"]) - -transform/using_both_formats: - trace_statements: - - context: span - statements: - - set(name, "bear") where attributes["http.path"] == "/animal" - - keep_keys(attributes, ["http.method", "http.path"]) - traces: - statements: - - set(name, "bear") where attributes["http.path"] == "/animal" - - keep_keys(attributes, ["http.method", "http.path"]) - transform/bad_syntax_log: log_statements: - context: log