Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/ottl] Add NewStatements to each context #18385

Merged
16 changes: 16 additions & 0 deletions .chloggen/ottl-update-parsestatements.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/ottl

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `NewStatements` func to enable creation of Statements structs.

# One or more tracking issues related to the change
issues: [18385]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
16 changes: 16 additions & 0 deletions pkg/ottl/contexts/ottldatapoint/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel
return p
}

type StatementsOption func(*ottl.Statements[TransformContext])

func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption {
return func(s *ottl.Statements[TransformContext]) {
ottl.WithErrorMode[TransformContext](errorMode)(s)
}
}

func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] {
s := ottl.NewStatements(statements, telemetrySettings)
for _, op := range options {
op(&s)
}
return s
}

var symbolTable = map[ottl.EnumSymbol]ottl.Enum{
"FLAG_NONE": 0,
"FLAG_NO_RECORDED_VALUE": 1,
Expand Down
16 changes: 16 additions & 0 deletions pkg/ottl/contexts/ottllog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel
return p
}

type StatementsOption func(*ottl.Statements[TransformContext])

func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption {
return func(s *ottl.Statements[TransformContext]) {
ottl.WithErrorMode[TransformContext](errorMode)(s)
}
}

func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] {
s := ottl.NewStatements(statements, telemetrySettings)
for _, op := range options {
op(&s)
}
return s
}

var symbolTable = map[ottl.EnumSymbol]ottl.Enum{
"SEVERITY_NUMBER_UNSPECIFIED": ottl.Enum(plog.SeverityNumberUnspecified),
"SEVERITY_NUMBER_TRACE": ottl.Enum(plog.SeverityNumberTrace),
Expand Down
16 changes: 16 additions & 0 deletions pkg/ottl/contexts/ottlmetric/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel
return p
}

type StatementsOption func(*ottl.Statements[TransformContext])

func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption {
return func(s *ottl.Statements[TransformContext]) {
ottl.WithErrorMode[TransformContext](errorMode)(s)
}
}

func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] {
s := ottl.NewStatements(statements, telemetrySettings)
for _, op := range options {
op(&s)
}
return s
}

var symbolTable = ottlcommon.MetricSymbolTable

func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/ottl/contexts/ottlresource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel
return p
}

type StatementsOption func(*ottl.Statements[TransformContext])

func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption {
return func(s *ottl.Statements[TransformContext]) {
ottl.WithErrorMode[TransformContext](errorMode)(s)
}
}

func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] {
s := ottl.NewStatements(statements, telemetrySettings)
for _, op := range options {
op(&s)
}
return s
}

func parseEnum(_ *ottl.EnumSymbol) (*ottl.Enum, error) {
return nil, fmt.Errorf("resource context does not provide Enum support")
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/ottl/contexts/ottlscope/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel
return p
}

type StatementsOption func(*ottl.Statements[TransformContext])

func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption {
return func(s *ottl.Statements[TransformContext]) {
ottl.WithErrorMode[TransformContext](errorMode)(s)
}
}

func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] {
s := ottl.NewStatements(statements, telemetrySettings)
for _, op := range options {
op(&s)
}
return s
}

func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) {
return nil, fmt.Errorf("instrumentation scope context does not provide Enum support")
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/ottl/contexts/ottlspan/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel
return p
}

type StatementsOption func(*ottl.Statements[TransformContext])

func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption {
return func(s *ottl.Statements[TransformContext]) {
ottl.WithErrorMode[TransformContext](errorMode)(s)
}
}

func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] {
s := ottl.NewStatements(statements, telemetrySettings)
for _, op := range options {
op(&s)
}
return s
}

func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) {
if val != nil {
if enum, ok := ottlcommon.SpanSymbolTable[*val]; ok {
Expand Down
16 changes: 16 additions & 0 deletions pkg/ottl/contexts/ottlspanevent/span_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ func NewParser(functions map[string]interface{}, telemetrySettings component.Tel
return p
}

type StatementsOption func(*ottl.Statements[TransformContext])

func WithErrorMode(errorMode ottl.ErrorMode) StatementsOption {
return func(s *ottl.Statements[TransformContext]) {
ottl.WithErrorMode[TransformContext](errorMode)(s)
}
}

func NewStatements(statements []*ottl.Statement[TransformContext], telemetrySettings component.TelemetrySettings, options ...StatementsOption) ottl.Statements[TransformContext] {
s := ottl.NewStatements(statements, telemetrySettings)
for _, op := range options {
op(&s)
}
return s
}

func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) {
if val != nil {
if enum, ok := ottlcommon.SpanSymbolTable[*val]; ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ottl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
go.opentelemetry.io/collector/component v0.71.0
go.opentelemetry.io/collector/pdata v1.0.0-rc5
go.opentelemetry.io/otel/trace v1.13.0
go.uber.org/multierr v1.9.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db
)
Expand All @@ -36,6 +35,7 @@ require (
go.opentelemetry.io/otel v1.13.0 // indirect
go.opentelemetry.io/otel/metric v0.36.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
Expand Down
65 changes: 41 additions & 24 deletions pkg/ottl/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/alecthomas/participle/v2"
"go.opentelemetry.io/collector/component"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -94,34 +93,33 @@ func WithEnumParser[K any](parser EnumParser) Option[K] {

func (p *Parser[K]) ParseStatements(statements []string) ([]*Statement[K], error) {
var parsedStatements []*Statement[K]
var errors error

for _, statement := range statements {
parsed, err := parseStatement(statement)
if err != nil {
errors = multierr.Append(errors, err)
continue
}
function, err := p.newFunctionCall(parsed.Invocation)
ps, err := p.ParseStatement(statement)
if err != nil {
errors = multierr.Append(errors, err)
continue
return nil, err
}
expression, err := p.newBoolExpr(parsed.WhereClause)
if err != nil {
errors = multierr.Append(errors, err)
continue
}
parsedStatements = append(parsedStatements, &Statement[K]{
function: function,
condition: expression,
})
parsedStatements = append(parsedStatements, ps)
}
return parsedStatements, nil
}

if errors != nil {
return nil, errors
func (p *Parser[K]) ParseStatement(statement string) (*Statement[K], error) {
parsed, err := parseStatement(statement)
if err != nil {
return nil, err
}
return parsedStatements, nil
function, err := p.newFunctionCall(parsed.Invocation)
if err != nil {
return nil, err
}
expression, err := p.newBoolExpr(parsed.WhereClause)
if err != nil {
return nil, err
}
return &Statement[K]{
function: function,
condition: expression,
}, nil
}

var parser = newParser[parsedStatement]()
Expand Down Expand Up @@ -158,11 +156,30 @@ func newParser[G any]() *participle.Parser[G] {

// Statements represents a list of statements that will be executed sequentially for a TransformContext.
type Statements[K any] struct {
statements []Statement[K]
statements []*Statement[K]
errorMode ErrorMode
telemetrySettings component.TelemetrySettings
}

type StatementsOption[K any] func(*Statements[K])

func WithErrorMode[K any](errorMode ErrorMode) StatementsOption[K] {
return func(s *Statements[K]) {
s.errorMode = errorMode
}
}

func NewStatements[K any](statements []*Statement[K], telemetrySettings component.TelemetrySettings, options ...StatementsOption[K]) Statements[K] {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
s := Statements[K]{
statements: statements,
telemetrySettings: telemetrySettings,
}
for _, op := range options {
op(&s)
}
return s
}

// Execute is a function that will execute all the statements in the Statements list.
func (s *Statements[K]) Execute(ctx context.Context, tCtx K) error {
for _, statement := range s.statements {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ottl/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,7 @@ func Test_Execute_Error(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
statements := Statements[interface{}]{
statements: []Statement[interface{}]{
statements: []*Statement[interface{}]{
{
condition: BoolExpr[any]{tt.condition},
function: Expr[any]{exprFunc: tt.function},
Expand Down
9 changes: 3 additions & 6 deletions processor/routingprocessor/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,11 @@ func (r *router[E, K]) registerRouteExporters(available map[component.ID]compone
func (r *router[E, K]) getStatementFrom(item RoutingTableItem) (*ottl.Statement[K], error) {
var statement *ottl.Statement[K]
if item.Statement != "" {
statements, err := r.parser.ParseStatements([]string{item.Statement})
var err error
statement, err = r.parser.ParseStatement(item.Statement)
if err != nil {
return statement, err
return nil, err
}
if len(statements) != 1 {
return statement, errors.New("more than one statement specified")
}
statement = statements[0]
}
return statement, nil
}
Expand Down
19 changes: 10 additions & 9 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (

var _ consumer.Logs = &logStatements{}

type logStatements []*ottl.Statement[ottllog.TransformContext]
type logStatements struct {
ottl.Statements[ottllog.TransformContext]
}

func (l logStatements) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
Expand All @@ -45,11 +47,9 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
logs := slogs.LogRecords()
for k := 0; k < logs.Len(); k++ {
tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource())
for _, statement := range l {
_, _, err := statement.Execute(ctx, tCtx)
if err != nil {
return err
}
err := l.Execute(ctx, tCtx)
if err != nil {
return err
}
}
}
Expand Down Expand Up @@ -93,13 +93,14 @@ func NewLogParserCollection(settings component.TelemetrySettings, options ...Log
func (pc LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) {
switch contextStatements.Context {
case Log:
lStatements, err := pc.logParser.ParseStatements(contextStatements.Statements)
parseStatements, err := pc.logParser.ParseStatements(contextStatements.Statements)
if err != nil {
return nil, err
}
return logStatements(lStatements), nil
lStatements := ottllog.NewStatements(parseStatements, pc.settings, ottllog.WithErrorMode(ottl.PropagateError))
return logStatements{lStatements}, nil
default:
statements, err := pc.parseCommonContextStatements(contextStatements)
statements, err := pc.parseCommonContextStatements(contextStatements, ottl.PropagateError)
if err != nil {
return nil, err
}
Expand Down
Loading