Skip to content

Commit

Permalink
[processor/transform] Move function handling logic to common and gene…
Browse files Browse the repository at this point in the history
…rify transform context. (#8972)

* Move function handling logic to common and generify transform context.

* Fix

* Keep traces functions
  • Loading branch information
anuraaga authored Apr 7, 2022
1 parent 57369ff commit 211079f
Show file tree
Hide file tree
Showing 17 changed files with 627 additions and 527 deletions.
3 changes: 2 additions & 1 deletion processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co
import (
"go.opentelemetry.io/collector/config"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

Expand All @@ -36,6 +37,6 @@ type Config struct {
var _ config.Processor = (*Config)(nil)

func (c *Config) Validate() error {
_, err := traces.Parse(c.Traces.Queries, c.Traces.functions)
_, err := common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
return err
}
4 changes: 2 additions & 2 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func TestLoadingConfig(t *testing.T) {
Expand All @@ -46,7 +46,7 @@ func TestLoadingConfig(t *testing.T) {
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: traces.DefaultFunctions(),
functions: common.DefaultFunctions(),
},
})
}
Expand Down
4 changes: 2 additions & 2 deletions processor/transformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func TestFactory_Type(t *testing.T) {
Expand All @@ -41,7 +41,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
Traces: TracesConfig{
Queries: []string{},

functions: traces.DefaultFunctions(),
functions: common.DefaultFunctions(),
},
})
assert.NoError(t, configtest.CheckConfigStruct(cfg))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,43 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

type condFunc = func(ctx spanTransformContext) bool
type condFunc = func(ctx TransformContext) bool

var alwaysTrue = func(ctx spanTransformContext) bool {
var alwaysTrue = func(ctx TransformContext) bool {
return true
}

func newConditionEvaluator(cond *common.Condition, functions map[string]interface{}) (condFunc, error) {
func newConditionEvaluator(cond *Condition, functions map[string]interface{}, pathParser PathExpressionParser) (condFunc, error) {
if cond == nil {
return alwaysTrue, nil
}
left, err := newGetter(cond.Left, functions)
left, err := NewGetter(cond.Left, functions, pathParser)
if err != nil {
return nil, err
}
right, err := newGetter(cond.Right, functions)
right, err := NewGetter(cond.Right, functions, pathParser)
// TODO(anuraaga): Check if both left and right are literals and const-evaluate
if err != nil {
return nil, err
}

switch cond.Op {
case "==":
return func(ctx spanTransformContext) bool {
a := left.get(ctx)
b := right.get(ctx)
return func(ctx TransformContext) bool {
a := left.Get(ctx)
b := right.Get(ctx)
return a == b
}, nil
case "!=":
return func(ctx spanTransformContext) bool {
a := left.get(ctx)
b := right.get(ctx)
return func(ctx TransformContext) bool {
a := left.Get(ctx)
b := right.Get(ctx)
return a != b
}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package traces
package common

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Test_newConditionEvaluator(t *testing.T) {
span := pdata.NewSpan()
span.SetName("bear")
tests := []struct {
name string
cond *common.Condition
cond *Condition
matching pdata.Span
}{
{
name: "literals match",
cond: &common.Condition{
Left: common.Value{
cond: &Condition{
Left: Value{
String: strp("hello"),
},
Right: common.Value{
Right: Value{
String: strp("hello"),
},
Op: "==",
Expand All @@ -46,11 +45,11 @@ func Test_newConditionEvaluator(t *testing.T) {
},
{
name: "literals don't match",
cond: &common.Condition{
Left: common.Value{
cond: &Condition{
Left: Value{
String: strp("hello"),
},
Right: common.Value{
Right: Value{
String: strp("goodbye"),
},
Op: "!=",
Expand All @@ -59,17 +58,17 @@ func Test_newConditionEvaluator(t *testing.T) {
},
{
name: "path expression matches",
cond: &common.Condition{
Left: common.Value{
Path: &common.Path{
Fields: []common.Field{
cond: &Condition{
Left: Value{
Path: &Path{
Fields: []Field{
{
Name: "name",
},
},
},
},
Right: common.Value{
Right: Value{
String: strp("bear"),
},
Op: "==",
Expand All @@ -78,17 +77,17 @@ func Test_newConditionEvaluator(t *testing.T) {
},
{
name: "path expression not matches",
cond: &common.Condition{
Left: common.Value{
Path: &common.Path{
Fields: []common.Field{
cond: &Condition{
Left: Value{
Path: &Path{
Fields: []Field{
{
Name: "name",
},
},
},
},
Right: common.Value{
Right: Value{
String: strp("cat"),
},
Op: "!=",
Expand All @@ -103,9 +102,9 @@ func Test_newConditionEvaluator(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
evaluate, err := newConditionEvaluator(tt.cond, DefaultFunctions())
evaluate, err := newConditionEvaluator(tt.cond, DefaultFunctions(), testParsePath)
assert.NoError(t, err)
assert.True(t, evaluate(spanTransformContext{
assert.True(t, evaluate(testTransformContext{
span: tt.matching,
il: pdata.NewInstrumentationScope(),
resource: pdata.NewResource(),
Expand All @@ -114,27 +113,63 @@ func Test_newConditionEvaluator(t *testing.T) {
}

t.Run("invalid", func(t *testing.T) {
_, err := newConditionEvaluator(&common.Condition{
Left: common.Value{
_, err := newConditionEvaluator(&Condition{
Left: Value{
String: strp("bear"),
},
Op: "<>",
Right: common.Value{
Right: Value{
String: strp("cat"),
},
}, DefaultFunctions())
}, DefaultFunctions(), testParsePath)
assert.Error(t, err)
})
}

func strp(s string) *string {
return &s
// Small copy of traces data model for use in common tests

type testTransformContext struct {
span pdata.Span
il pdata.InstrumentationScope
resource pdata.Resource
}

func (ctx testTransformContext) GetItem() interface{} {
return ctx.span
}

func (ctx testTransformContext) GetInstrumentationScope() pdata.InstrumentationScope {
return ctx.il
}

func (ctx testTransformContext) GetResource() pdata.Resource {
return ctx.resource
}

func intp(i int64) *int64 {
return &i
// pathGetSetter is a getSetter which has been resolved using a path expression provided by a user.
type testGetSetter struct {
getter ExprFunc
setter func(ctx TransformContext, val interface{})
}

func floatp(f float64) *float64 {
return &f
func (path testGetSetter) Get(ctx TransformContext) interface{} {
return path.getter(ctx)
}

func (path testGetSetter) Set(ctx TransformContext, val interface{}) {
path.setter(ctx, val)
}

func testParsePath(val *Path) (GetSetter, error) {
if val != nil && len(val.Fields) > 0 && val.Fields[0].Name == "name" {
return &testGetSetter{
getter: func(ctx TransformContext) interface{} {
return ctx.GetItem().(pdata.Span).Name()
},
setter: func(ctx TransformContext, val interface{}) {
ctx.GetItem().(pdata.Span).SetName(val.(string))
},
}, nil
}
return nil, fmt.Errorf("bad path %v", val)
}
86 changes: 86 additions & 0 deletions processor/transformprocessor/internal/common/expression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"

import (
"fmt"

"go.opentelemetry.io/collector/model/pdata"
)

type TransformContext interface {
GetItem() interface{}
GetInstrumentationScope() pdata.InstrumentationScope
GetResource() pdata.Resource
}

type ExprFunc func(ctx TransformContext) interface{}

type Getter interface {
Get(ctx TransformContext) interface{}
}

type Setter interface {
Set(ctx TransformContext, val interface{})
}

type GetSetter interface {
Getter
Setter
}

type literal struct {
value interface{}
}

func (l literal) Get(ctx TransformContext) interface{} {
return l.value
}

type exprGetter struct {
expr ExprFunc
}

func (g exprGetter) Get(ctx TransformContext) interface{} {
return g.expr(ctx)
}

func NewGetter(val Value, functions map[string]interface{}, pathParser PathExpressionParser) (Getter, error) {
if s := val.String; s != nil {
return &literal{value: *s}, nil
}
if f := val.Float; f != nil {
return &literal{value: *f}, nil
}
if i := val.Int; i != nil {
return &literal{value: *i}, nil
}

if val.Path != nil {
return pathParser(val.Path)
}

if val.Invocation == nil {
// In practice, can't happen since the DSL grammar guarantees one is set
return nil, fmt.Errorf("no value field set. This is a bug in the transformprocessor")
}
call, err := NewFunctionCall(*val.Invocation, functions, pathParser)
if err != nil {
return nil, err
}
return &exprGetter{
expr: call,
}, nil
}
Loading

0 comments on commit 211079f

Please sign in to comment.