diff --git a/.chloggen/ottl-update-parsestatements.yaml b/.chloggen/ottl-update-parsestatements.yaml new file mode 100755 index 000000000000..b29aa2a4268d --- /dev/null +++ b/.chloggen/ottl-update-parsestatements.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `NewStatements` func to enable creation of Statements structs. + +# One or more tracking issues related to the change +issues: [18385] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/ottl/contexts/ottldatapoint/datapoint.go b/pkg/ottl/contexts/ottldatapoint/datapoint.go index 0c489fecc6d1..8a427c1b87e3 100644 --- a/pkg/ottl/contexts/ottldatapoint/datapoint.go +++ b/pkg/ottl/contexts/ottldatapoint/datapoint.go @@ -89,6 +89,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel return p } +type StatementsOption func(*ottl.Statements[TransformContext]) + +func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption { + return func(s *ottl.Statements[TransformContext]) { + ottl.WithErrorMode[TransformContext](errorMode)(s) + } +} + +func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] { + s := ottl.NewStatements(statements, telemetrySettings) + for _, op := range options { + op(&s) + } + return s +} + var symbolTable = map[ottl.EnumSymbol]ottl.Enum{ "FLAG_NONE": 0, "FLAG_NO_RECORDED_VALUE": 1, diff --git a/pkg/ottl/contexts/ottllog/log.go b/pkg/ottl/contexts/ottllog/log.go index 4987414d9d80..c166cb25af23 100644 --- a/pkg/ottl/contexts/ottllog/log.go +++ b/pkg/ottl/contexts/ottllog/log.go @@ -78,6 +78,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel return p } +type StatementsOption func(*ottl.Statements[TransformContext]) + +func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption { + return func(s *ottl.Statements[TransformContext]) { + ottl.WithErrorMode[TransformContext](errorMode)(s) + } +} + +func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] { + s := ottl.NewStatements(statements, telemetrySettings) + for _, op := range options { + op(&s) + } + return s +} + var symbolTable = map[ottl.EnumSymbol]ottl.Enum{ "SEVERITY_NUMBER_UNSPECIFIED": ottl.Enum(plog.SeverityNumberUnspecified), "SEVERITY_NUMBER_TRACE": ottl.Enum(plog.SeverityNumberTrace), diff --git a/pkg/ottl/contexts/ottlmetric/metrics.go b/pkg/ottl/contexts/ottlmetric/metrics.go index 5eecbea6ae86..0ffb64c56c07 100644 --- a/pkg/ottl/contexts/ottlmetric/metrics.go +++ b/pkg/ottl/contexts/ottlmetric/metrics.go @@ -77,6 +77,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel return p } +type StatementsOption func(*ottl.Statements[TransformContext]) + +func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption { + return func(s *ottl.Statements[TransformContext]) { + ottl.WithErrorMode[TransformContext](errorMode)(s) + } +} + +func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] { + s := ottl.NewStatements(statements, telemetrySettings) + for _, op := range options { + op(&s) + } + return s +} + var symbolTable = ottlcommon.MetricSymbolTable func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { diff --git a/pkg/ottl/contexts/ottlresource/resource.go b/pkg/ottl/contexts/ottlresource/resource.go index 91f872f20574..8d89f12e0a55 100644 --- a/pkg/ottl/contexts/ottlresource/resource.go +++ b/pkg/ottl/contexts/ottlresource/resource.go @@ -62,6 +62,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel return p } +type StatementsOption func(*ottl.Statements[TransformContext]) + +func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption { + return func(s *ottl.Statements[TransformContext]) { + ottl.WithErrorMode[TransformContext](errorMode)(s) + } +} + +func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] { + s := ottl.NewStatements(statements, telemetrySettings) + for _, op := range options { + op(&s) + } + return s +} + func parseEnum(_ *ottl.EnumSymbol) (*ottl.Enum, error) { return nil, fmt.Errorf("resource context does not provide Enum support") } diff --git a/pkg/ottl/contexts/ottlscope/scope.go b/pkg/ottl/contexts/ottlscope/scope.go index 0bcb9c621192..82bea0cce928 100644 --- a/pkg/ottl/contexts/ottlscope/scope.go +++ b/pkg/ottl/contexts/ottlscope/scope.go @@ -69,6 +69,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel return p } +type StatementsOption func(*ottl.Statements[TransformContext]) + +func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption { + return func(s *ottl.Statements[TransformContext]) { + ottl.WithErrorMode[TransformContext](errorMode)(s) + } +} + +func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] { + s := ottl.NewStatements(statements, telemetrySettings) + for _, op := range options { + op(&s) + } + return s +} + func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { return nil, fmt.Errorf("instrumentation scope context does not provide Enum support") } diff --git a/pkg/ottl/contexts/ottlspan/span.go b/pkg/ottl/contexts/ottlspan/span.go index 8a03f08222dd..b5e8a118f0f4 100644 --- a/pkg/ottl/contexts/ottlspan/span.go +++ b/pkg/ottl/contexts/ottlspan/span.go @@ -76,6 +76,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel return p } +type StatementsOption func(*ottl.Statements[TransformContext]) + +func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption { + return func(s *ottl.Statements[TransformContext]) { + ottl.WithErrorMode[TransformContext](errorMode)(s) + } +} + +func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] { + s := ottl.NewStatements(statements, telemetrySettings) + for _, op := range options { + op(&s) + } + return s +} + func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { if val != nil { if enum, ok := ottlcommon.SpanSymbolTable[*val]; ok { diff --git a/pkg/ottl/contexts/ottlspanevent/span_events.go b/pkg/ottl/contexts/ottlspanevent/span_events.go index 85363f117e87..d6277ab7ec34 100644 --- a/pkg/ottl/contexts/ottlspanevent/span_events.go +++ b/pkg/ottl/contexts/ottlspanevent/span_events.go @@ -84,6 +84,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel return p } +type StatementsOption func(*ottl.Statements[TransformContext]) + +func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption { + return func(s *ottl.Statements[TransformContext]) { + ottl.WithErrorMode[TransformContext](errorMode)(s) + } +} + +func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] { + s := ottl.NewStatements(statements, telemetrySettings) + for _, op := range options { + op(&s) + } + return s +} + func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { if val != nil { if enum, ok := ottlcommon.SpanSymbolTable[*val]; ok { diff --git a/pkg/ottl/go.mod b/pkg/ottl/go.mod index f524149e4140..00317fcbd3a1 100644 --- a/pkg/ottl/go.mod +++ b/pkg/ottl/go.mod @@ -11,7 +11,6 @@ require ( go.opentelemetry.io/collector/component v0.71.0 go.opentelemetry.io/collector/pdata v1.0.0-rc5 go.opentelemetry.io/otel/trace v1.13.0 - go.uber.org/multierr v1.9.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20221205204356-47842c84f3db ) @@ -36,6 +35,7 @@ require ( go.opentelemetry.io/otel v1.13.0 // indirect go.opentelemetry.io/otel/metric v0.36.0 // indirect go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.9.0 // indirect golang.org/x/net v0.5.0 // indirect golang.org/x/sys v0.4.0 // indirect golang.org/x/text v0.6.0 // indirect diff --git a/pkg/ottl/parser.go b/pkg/ottl/parser.go index 2dcefb199126..73779f9f637c 100644 --- a/pkg/ottl/parser.go +++ b/pkg/ottl/parser.go @@ -20,7 +20,6 @@ import ( "github.com/alecthomas/participle/v2" "go.opentelemetry.io/collector/component" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -94,34 +93,33 @@ func WithEnumParser[K any](parser EnumParser) Option[K] { func (p *Parser[K]) ParseStatements(statements []string) ([]*Statement[K], error) { var parsedStatements []*Statement[K] - var errors error - for _, statement := range statements { - parsed, err := parseStatement(statement) - if err != nil { - errors = multierr.Append(errors, err) - continue - } - function, err := p.newFunctionCall(parsed.Invocation) + ps, err := p.ParseStatement(statement) if err != nil { - errors = multierr.Append(errors, err) - continue + return nil, err } - expression, err := p.newBoolExpr(parsed.WhereClause) - if err != nil { - errors = multierr.Append(errors, err) - continue - } - parsedStatements = append(parsedStatements, &Statement[K]{ - function: function, - condition: expression, - }) + parsedStatements = append(parsedStatements, ps) } + return parsedStatements, nil +} - if errors != nil { - return nil, errors +func (p *Parser[K]) ParseStatement(statement string) (*Statement[K], error) { + parsed, err := parseStatement(statement) + if err != nil { + return nil, err } - return parsedStatements, nil + function, err := p.newFunctionCall(parsed.Invocation) + if err != nil { + return nil, err + } + expression, err := p.newBoolExpr(parsed.WhereClause) + if err != nil { + return nil, err + } + return &Statement[K]{ + function: function, + condition: expression, + }, nil } var parser = newParser[parsedStatement]() @@ -158,11 +156,30 @@ func newParser[G any]() *participle.Parser[G] { // Statements represents a list of statements that will be executed sequentially for a TransformContext. type Statements[K any] struct { - statements []Statement[K] + statements []*Statement[K] errorMode ErrorMode telemetrySettings component.TelemetrySettings } +type StatementsOption[K any] func(*Statements[K]) + +func WithErrorMode[K any](errorMode ErrorMode) StatementsOption[K] { + return func(s *Statements[K]) { + s.errorMode = errorMode + } +} + +func NewStatements[K any](statements []*Statement[K], telemetrySettings component.TelemetrySettings, options ...StatementsOption[K]) Statements[K] { + s := Statements[K]{ + statements: statements, + telemetrySettings: telemetrySettings, + } + for _, op := range options { + op(&s) + } + return s +} + // Execute is a function that will execute all the statements in the Statements list. func (s *Statements[K]) Execute(ctx context.Context, tCtx K) error { for _, statement := range s.statements { diff --git a/pkg/ottl/parser_test.go b/pkg/ottl/parser_test.go index 406c1f759e47..6a325fca0a4a 100644 --- a/pkg/ottl/parser_test.go +++ b/pkg/ottl/parser_test.go @@ -1363,7 +1363,7 @@ func Test_Execute_Error(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { statements := Statements[interface{}]{ - statements: []Statement[interface{}]{ + statements: []*Statement[interface{}]{ { condition: BoolExpr[any]{tt.condition}, function: Expr[any]{exprFunc: tt.function}, diff --git a/processor/routingprocessor/router.go b/processor/routingprocessor/router.go index 46f5a3a3654a..92462b17da05 100644 --- a/processor/routingprocessor/router.go +++ b/processor/routingprocessor/router.go @@ -132,14 +132,11 @@ func (r *router[E, K]) registerRouteExporters(available map[component.ID]compone func (r *router[E, K]) getStatementFrom(item RoutingTableItem) (*ottl.Statement[K], error) { var statement *ottl.Statement[K] if item.Statement != "" { - statements, err := r.parser.ParseStatements([]string{item.Statement}) + var err error + statement, err = r.parser.ParseStatement(item.Statement) if err != nil { return statement, err } - if len(statements) != 1 { - return statement, errors.New("more than one statement specified") - } - statement = statements[0] } return statement, nil } diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 8267b2dcbe4d..b8b16bf631b7 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -29,7 +29,9 @@ import ( var _ consumer.Logs = &logStatements{} -type logStatements []*ottl.Statement[ottllog.TransformContext] +type logStatements struct { + ottl.Statements[ottllog.TransformContext] +} func (l logStatements) Capabilities() consumer.Capabilities { return consumer.Capabilities{ @@ -45,11 +47,9 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { logs := slogs.LogRecords() for k := 0; k < logs.Len(); k++ { tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource()) - for _, statement := range l { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := l.Execute(ctx, tCtx) + if err != nil { + return err } } } @@ -93,13 +93,14 @@ func NewLogParserCollection(settings component.TelemetrySettings, options ...Log func (pc LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) { switch contextStatements.Context { case Log: - lStatements, err := pc.logParser.ParseStatements(contextStatements.Statements) + parsedStatements, err := pc.logParser.ParseStatements(contextStatements.Statements) if err != nil { return nil, err } - return logStatements(lStatements), nil + lStatements := ottllog.NewStatements(parsedStatements, pc.settings, ottllog.WithErrorMode(ottl.PropagateError)) + return logStatements{lStatements}, nil default: - statements, err := pc.parseCommonContextStatements(contextStatements) + statements, err := pc.parseCommonContextStatements(contextStatements, ottl.PropagateError) if err != nil { return nil, err } diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index 097818dffb51..fd58391ed65e 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -31,7 +31,9 @@ import ( var _ consumer.Metrics = &metricStatements{} -type metricStatements []*ottl.Statement[ottlmetric.TransformContext] +type metricStatements struct { + ottl.Statements[ottlmetric.TransformContext] +} func (m metricStatements) Capabilities() consumer.Capabilities { return consumer.Capabilities{ @@ -47,11 +49,9 @@ func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics metrics := smetrics.Metrics() for k := 0; k < metrics.Len(); k++ { tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Scope(), rmetrics.Resource()) - for _, statement := range m { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := m.Execute(ctx, tCtx) + if err != nil { + return err } } } @@ -61,7 +61,9 @@ func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics var _ consumer.Metrics = &dataPointStatements{} -type dataPointStatements []*ottl.Statement[ottldatapoint.TransformContext] +type dataPointStatements struct { + ottl.Statements[ottldatapoint.TransformContext] +} func (d dataPointStatements) Capabilities() consumer.Capabilities { return consumer.Capabilities{ @@ -102,7 +104,7 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { for i := 0; i < dps.Len(); i++ { tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.callFunctions(ctx, tCtx) + err := d.Execute(ctx, tCtx) if err != nil { return err } @@ -113,7 +115,7 @@ func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pme func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { for i := 0; i < dps.Len(); i++ { tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.callFunctions(ctx, tCtx) + err := d.Execute(ctx, tCtx) if err != nil { return err } @@ -124,7 +126,7 @@ func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { for i := 0; i < dps.Len(); i++ { tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.callFunctions(ctx, tCtx) + err := d.Execute(ctx, tCtx) if err != nil { return err } @@ -135,17 +137,7 @@ func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Con func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { for i := 0; i < dps.Len(); i++ { tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.callFunctions(ctx, tCtx) - if err != nil { - return err - } - } - return nil -} - -func (d dataPointStatements) callFunctions(ctx context.Context, tCtx ottldatapoint.TransformContext) error { - for _, statement := range d { - _, _, err := statement.Execute(ctx, tCtx) + err := d.Execute(ctx, tCtx) if err != nil { return err } @@ -197,19 +189,21 @@ func NewMetricParserCollection(settings component.TelemetrySettings, options ... func (pc MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Metrics, error) { switch contextStatements.Context { case Metric: - mStatements, err := pc.metricParser.ParseStatements(contextStatements.Statements) + parseStatements, err := pc.metricParser.ParseStatements(contextStatements.Statements) if err != nil { return nil, err } - return metricStatements(mStatements), nil + mStatements := ottlmetric.NewStatements(parseStatements, pc.settings, ottlmetric.WithErrorMode(ottl.PropagateError)) + return metricStatements{mStatements}, nil case DataPoint: - dpStatements, err := pc.dataPointParser.ParseStatements(contextStatements.Statements) + parsedStatements, err := pc.dataPointParser.ParseStatements(contextStatements.Statements) if err != nil { return nil, err } - return dataPointStatements(dpStatements), nil + dpStatements := ottldatapoint.NewStatements(parsedStatements, pc.settings, ottldatapoint.WithErrorMode(ottl.PropagateError)) + return dataPointStatements{dpStatements}, nil default: - statements, err := pc.parseCommonContextStatements(contextStatements) + statements, err := pc.parseCommonContextStatements(contextStatements, ottl.PropagateError) if err != nil { return nil, err } diff --git a/processor/transformprocessor/internal/common/processor.go b/processor/transformprocessor/internal/common/processor.go index 9d49fb45ddb4..73b0eeee6dab 100644 --- a/processor/transformprocessor/internal/common/processor.go +++ b/processor/transformprocessor/internal/common/processor.go @@ -34,7 +34,9 @@ var _ consumer.Metrics = &resourceStatements{} var _ consumer.Logs = &resourceStatements{} var _ baseContext = &resourceStatements{} -type resourceStatements []*ottl.Statement[ottlresource.TransformContext] +type resourceStatements struct { + ottl.Statements[ottlresource.TransformContext] +} func (r resourceStatements) Capabilities() consumer.Capabilities { return consumer.Capabilities{ @@ -46,11 +48,9 @@ func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) tCtx := ottlresource.NewTransformContext(rspans.Resource()) - for _, statement := range r { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := r.Execute(ctx, tCtx) + if err != nil { + return err } } return nil @@ -60,11 +60,9 @@ func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metri for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) tCtx := ottlresource.NewTransformContext(rmetrics.Resource()) - for _, statement := range r { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := r.Execute(ctx, tCtx) + if err != nil { + return err } } return nil @@ -74,11 +72,9 @@ func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error for i := 0; i < ld.ResourceLogs().Len(); i++ { rlogs := ld.ResourceLogs().At(i) tCtx := ottlresource.NewTransformContext(rlogs.Resource()) - for _, statement := range r { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := r.Execute(ctx, tCtx) + if err != nil { + return err } } return nil @@ -89,7 +85,9 @@ var _ consumer.Metrics = &scopeStatements{} var _ consumer.Logs = &scopeStatements{} var _ baseContext = &scopeStatements{} -type scopeStatements []*ottl.Statement[ottlscope.TransformContext] +type scopeStatements struct { + ottl.Statements[ottlscope.TransformContext] +} func (s scopeStatements) Capabilities() consumer.Capabilities { return consumer.Capabilities{ @@ -103,11 +101,9 @@ func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er for j := 0; j < rspans.ScopeSpans().Len(); j++ { sspans := rspans.ScopeSpans().At(j) tCtx := ottlscope.NewTransformContext(sspans.Scope(), rspans.Resource()) - for _, statement := range s { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := s.Execute(ctx, tCtx) + if err != nil { + return err } } } @@ -120,11 +116,9 @@ func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { smetrics := rmetrics.ScopeMetrics().At(j) tCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource()) - for _, statement := range s { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := s.Execute(ctx, tCtx) + if err != nil { + return err } } } @@ -137,11 +131,9 @@ func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { for j := 0; j < rlogs.ScopeLogs().Len(); j++ { slogs := rlogs.ScopeLogs().At(j) tCtx := ottlscope.NewTransformContext(slogs.Scope(), rlogs.Resource()) - for _, statement := range s { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := s.Execute(ctx, tCtx) + if err != nil { + return err } } } @@ -160,20 +152,22 @@ type baseContext interface { consumer.Logs } -func (pc parserCollection) parseCommonContextStatements(contextStatement ContextStatements) (baseContext, error) { +func (pc parserCollection) parseCommonContextStatements(contextStatement ContextStatements, errorMode ottl.ErrorMode) (baseContext, error) { switch contextStatement.Context { case Resource: - statements, err := pc.resourceParser.ParseStatements(contextStatement.Statements) + parsedStatements, err := pc.resourceParser.ParseStatements(contextStatement.Statements) if err != nil { return nil, err } - return resourceStatements(statements), nil + rStatements := ottlresource.NewStatements(parsedStatements, pc.settings, ottlresource.WithErrorMode(errorMode)) + return resourceStatements{rStatements}, nil case Scope: - statements, err := pc.scopeParser.ParseStatements(contextStatement.Statements) + parsedStatements, err := pc.scopeParser.ParseStatements(contextStatement.Statements) if err != nil { return nil, err } - return scopeStatements(statements), nil + sStatements := ottlscope.NewStatements(parsedStatements, pc.settings, ottlscope.WithErrorMode(errorMode)) + return scopeStatements{sStatements}, nil default: return nil, fmt.Errorf("unknown context %v", contextStatement.Context) } diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index 8377cc8a7efc..4ff5dc2245bf 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -30,7 +30,9 @@ import ( var _ consumer.Traces = &traceStatements{} -type traceStatements []*ottl.Statement[ottlspan.TransformContext] +type traceStatements struct { + ottl.Statements[ottlspan.TransformContext] +} func (t traceStatements) Capabilities() consumer.Capabilities { return consumer.Capabilities{ @@ -46,11 +48,9 @@ func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er spans := sspans.Spans() for k := 0; k < spans.Len(); k++ { tCtx := ottlspan.NewTransformContext(spans.At(k), sspans.Scope(), rspans.Resource()) - for _, statement := range t { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := t.Execute(ctx, tCtx) + if err != nil { + return err } } } @@ -60,7 +60,9 @@ func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er var _ consumer.Traces = &spanEventStatements{} -type spanEventStatements []*ottl.Statement[ottlspanevent.TransformContext] +type spanEventStatements struct { + ottl.Statements[ottlspanevent.TransformContext] +} func (s spanEventStatements) Capabilities() consumer.Capabilities { return consumer.Capabilities{ @@ -79,11 +81,9 @@ func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces spanEvents := span.Events() for n := 0; n < spanEvents.Len(); n++ { tCtx := ottlspanevent.NewTransformContext(spanEvents.At(n), span, sspans.Scope(), rspans.Resource()) - for _, statement := range s { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + err := s.Execute(ctx, tCtx) + if err != nil { + return err } } } @@ -136,18 +136,20 @@ func NewTraceParserCollection(settings component.TelemetrySettings, options ...T func (pc TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Traces, error) { switch contextStatements.Context { case Span: - tStatements, err := pc.spanParser.ParseStatements(contextStatements.Statements) + parsedStatements, err := pc.spanParser.ParseStatements(contextStatements.Statements) if err != nil { return nil, err } - return traceStatements(tStatements), nil + sStatements := ottlspan.NewStatements(parsedStatements, pc.settings, ottlspan.WithErrorMode(ottl.PropagateError)) + return traceStatements{sStatements}, nil case SpanEvent: - seStatements, err := pc.spanEventParser.ParseStatements(contextStatements.Statements) + parsedStatements, err := pc.spanEventParser.ParseStatements(contextStatements.Statements) if err != nil { return nil, err } - return spanEventStatements(seStatements), nil + seStatements := ottlspanevent.NewStatements(parsedStatements, pc.settings, ottlspanevent.WithErrorMode(ottl.PropagateError)) + return spanEventStatements{seStatements}, nil default: - return pc.parseCommonContextStatements(contextStatements) + return pc.parseCommonContextStatements(contextStatements, ottl.PropagateError) } }