Skip to content

Commit

Permalink
[processor/transform] Remove old config options (#16773)
Browse files Browse the repository at this point in the history
* remove deprecated config options
  • Loading branch information
TylerHelmuth authored Jan 6, 2023
1 parent 55298b9 commit 3fadc47
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 336 deletions.
16 changes: 16 additions & 0 deletions .chloggen/tp-remove-old-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove deprecated config options. Use `[trace|metric|log]_statements` instead.

# One or more tracking issues related to the change
issues: [16773]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
37 changes: 0 additions & 37 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,9 @@
package transformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"

import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
Expand All @@ -33,9 +28,6 @@ type Config struct {
TraceStatements []common.ContextStatements `mapstructure:"trace_statements"`
MetricStatements []common.ContextStatements `mapstructure:"metric_statements"`
LogStatements []common.ContextStatements `mapstructure:"log_statements"`

// Deprecated. Use TraceStatements, MetricStatements, and LogStatements instead
OTTLConfig `mapstructure:",squash"`
}

type OTTLConfig struct {
Expand All @@ -51,19 +43,6 @@ type SignalConfig struct {
var _ component.Config = (*Config)(nil)

func (c *Config) Validate() error {
if (len(c.Traces.Statements) > 0 || len(c.Metrics.Statements) > 0 || len(c.Logs.Statements) > 0) &&
(len(c.TraceStatements) > 0 || len(c.MetricStatements) > 0 || len(c.LogStatements) > 0) {
return fmt.Errorf("cannot use Traces, Metrics and/or Logs with TraceStatements, MetricStatements and/or LogStatements")
}

if len(c.Traces.Statements) > 0 {
ottlspanp := ottlspan.NewParser(traces.SpanFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottlspanp.ParseStatements(c.Traces.Statements)
if err != nil {
return err
}
}

if len(c.TraceStatements) > 0 {
pc, err := common.NewTraceParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithSpanParser(traces.SpanFunctions()), common.WithSpanEventParser(traces.SpanEventFunctions()))
if err != nil {
Expand All @@ -77,14 +56,6 @@ func (c *Config) Validate() error {
}
}

if len(c.Metrics.Statements) > 0 {
ottlmetricsp := ottldatapoint.NewParser(metrics.DataPointFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottlmetricsp.ParseStatements(c.Metrics.Statements)
if err != nil {
return err
}
}

if len(c.MetricStatements) > 0 {
pc, err := common.NewMetricParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithMetricParser(metrics.MetricFunctions()), common.WithDataPointParser(metrics.DataPointFunctions()))
if err != nil {
Expand All @@ -98,14 +69,6 @@ func (c *Config) Validate() error {
}
}

if len(c.Logs.Statements) > 0 {
ottllogsp := ottllog.NewParser(logs.LogFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottllogsp.ParseStatements(c.Logs.Statements)
if err != nil {
return err
}
}

if len(c.LogStatements) > 0 {
pc, err := common.NewLogParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithLogParser(logs.LogFunctions()))
if err != nil {
Expand Down
43 changes: 0 additions & 43 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(typeStr, ""),
expected: &Config{
OTTLConfig: OTTLConfig{
Traces: SignalConfig{
Statements: []string{},
},
Metrics: SignalConfig{
Statements: []string{},
},
Logs: SignalConfig{
Statements: []string{},
},
},
TraceStatements: []common.ContextStatements{
{
Context: "span",
Expand Down Expand Up @@ -94,38 +83,6 @@ func TestLoadConfig(t *testing.T) {
},
},
},
{
id: component.NewIDWithName(typeStr, "deprecated_format"),
expected: &Config{
OTTLConfig: OTTLConfig{
Traces: SignalConfig{
Statements: []string{
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
Metrics: SignalConfig{
Statements: []string{
`set(metric.name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
Logs: SignalConfig{
Statements: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
},
TraceStatements: []common.ContextStatements{},
MetricStatements: []common.ContextStatements{},
LogStatements: []common.ContextStatements{},
},
},
{
id: component.NewIDWithName(typeStr, "using_both_formats"),
errorMessage: "cannot use Traces, Metrics and/or Logs with TraceStatements, MetricStatements and/or LogStatements",
},
{
id: component.NewIDWithName(typeStr, "bad_syntax_trace"),
errorMessage: "unable to parse OTTL statement: 1:18: unexpected token \"where\" (expected \")\")",
Expand Down
17 changes: 3 additions & 14 deletions processor/transformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,6 @@ func NewFactory() processor.Factory {

func createDefaultConfig() component.Config {
return &Config{
OTTLConfig: OTTLConfig{
Logs: SignalConfig{
Statements: []string{},
},
Traces: SignalConfig{
Statements: []string{},
},
Metrics: SignalConfig{
Statements: []string{},
},
},
TraceStatements: []common.ContextStatements{},
MetricStatements: []common.ContextStatements{},
LogStatements: []common.ContextStatements{},
Expand All @@ -73,7 +62,7 @@ func createLogsProcessor(
) (processor.Logs, error) {
oCfg := cfg.(*Config)

proc, err := logs.NewProcessor(oCfg.Logs.Statements, oCfg.LogStatements, set.TelemetrySettings)
proc, err := logs.NewProcessor(oCfg.LogStatements, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand All @@ -94,7 +83,7 @@ func createTracesProcessor(
) (processor.Traces, error) {
oCfg := cfg.(*Config)

proc, err := traces.NewProcessor(oCfg.Traces.Statements, oCfg.TraceStatements, set.TelemetrySettings)
proc, err := traces.NewProcessor(oCfg.TraceStatements, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand All @@ -115,7 +104,7 @@ func createMetricsProcessor(
) (processor.Metrics, error) {
oCfg := cfg.(*Config)

proc, err := metrics.NewProcessor(oCfg.Metrics.Statements, oCfg.MetricStatements, set.TelemetrySettings)
proc, err := metrics.NewProcessor(oCfg.MetricStatements, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand Down
56 changes: 36 additions & 20 deletions processor/transformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,6 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.Equal(t, cfg, &Config{
OTTLConfig: OTTLConfig{
Traces: SignalConfig{
Statements: []string{},
},
Metrics: SignalConfig{
Statements: []string{},
},
Logs: SignalConfig{
Statements: []string{},
},
},
TraceStatements: []common.ContextStatements{},
MetricStatements: []common.ContextStatements{},
LogStatements: []common.ContextStatements{},
Expand All @@ -68,7 +57,12 @@ func TestFactoryCreateTracesProcessor_InvalidActions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Traces.Statements = []string{`set(123`}
oCfg.TraceStatements = []common.ContextStatements{
{
Context: "span",
Statements: []string{`set(123`},
},
}
ap, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Error(t, err)
assert.Nil(t, ap)
Expand All @@ -78,8 +72,12 @@ func TestFactoryCreateTracesProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Traces.Statements = []string{`set(attributes["test"], "pass") where name == "operationA"`}

oCfg.TraceStatements = []common.ContextStatements{
{
Context: "span",
Statements: []string{`set(attributes["test"], "pass") where name == "operationA"`},
},
}
tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NotNil(t, tp)
assert.NoError(t, err)
Expand All @@ -103,7 +101,12 @@ func TestFactoryCreateMetricsProcessor_InvalidActions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Metrics.Statements = []string{`set(123`}
oCfg.MetricStatements = []common.ContextStatements{
{
Context: "datapoint",
Statements: []string{`set(123`},
},
}
ap, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Error(t, err)
assert.Nil(t, ap)
Expand All @@ -113,8 +116,12 @@ func TestFactoryCreateMetricsProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Metrics.Statements = []string{`set(attributes["test"], "pass") where metric.name == "operationA"`}

oCfg.MetricStatements = []common.ContextStatements{
{
Context: "datapoint",
Statements: []string{`set(attributes["test"], "pass") where metric.name == "operationA"`},
},
}
metricsProcessor, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NotNil(t, metricsProcessor)
assert.NoError(t, err)
Expand All @@ -138,8 +145,12 @@ func TestFactoryCreateLogsProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Logs.Statements = []string{`set(attributes["test"], "pass") where body == "operationA"`}

oCfg.LogStatements = []common.ContextStatements{
{
Context: "log",
Statements: []string{`set(attributes["test"], "pass") where body == "operationA"`},
},
}
lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NotNil(t, lp)
assert.NoError(t, err)
Expand All @@ -163,7 +174,12 @@ func TestFactoryCreateLogsProcessor_InvalidActions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Logs.Statements = []string{`set(123`}
oCfg.LogStatements = []common.ContextStatements{
{
Context: "log",
Statements: []string{`set(123`},
},
}
ap, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Error(t, err)
assert.Nil(t, ap)
Expand Down
44 changes: 5 additions & 39 deletions processor/transformprocessor/internal/logs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,14 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

type Processor struct {
contexts []consumer.Logs
// Deprecated. Use contexts instead
statements []*ottl.Statement[ottllog.TransformContext]
}

func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottllog.NewParser(LogFunctions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
}
return &Processor{
statements: parsedStatements,
}, nil
}

func NewProcessor(contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
pc, err := common.NewLogParserCollection(settings, common.WithLogParser(LogFunctions()))
if err != nil {
return nil, err
Expand All @@ -64,29 +49,10 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme
}

func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
if len(p.statements) > 0 {
for i := 0; i < ld.ResourceLogs().Len(); i++ {
rlogs := ld.ResourceLogs().At(i)
for j := 0; j < rlogs.ScopeLogs().Len(); j++ {
slogs := rlogs.ScopeLogs().At(j)
logs := slogs.LogRecords()
for k := 0; k < logs.Len(); k++ {
tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource())
for _, statement := range p.statements {
_, _, err := statement.Execute(ctx, tCtx)
if err != nil {
return ld, err
}
}
}
}
}
} else {
for _, c := range p.contexts {
err := c.ConsumeLogs(ctx, ld)
if err != nil {
return ld, err
}
for _, c := range p.contexts {
err := c.ConsumeLogs(ctx, ld)
if err != nil {
return ld, err
}
}
return ld, nil
Expand Down
8 changes: 4 additions & 4 deletions processor/transformprocessor/internal/logs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Test_ProcessLogs_ResourceContext(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down Expand Up @@ -94,7 +94,7 @@ func Test_ProcessLogs_ScopeContext(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down Expand Up @@ -309,7 +309,7 @@ func Test_ProcessLogs_LogContext(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor(nil, []common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor([]common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down Expand Up @@ -426,7 +426,7 @@ func Test_ProcessLogs_MixContext(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor(nil, tt.contextStatments, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor(tt.contextStatments, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down
Loading

0 comments on commit 3fadc47

Please sign in to comment.