diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fbd37bfb1f0..91eb41fd1848 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ ### 💡 Enhancements 💡 +- `transformprocessor`: Add byte slice literal to the grammar. Add new SpanID and TraceID functions that take a byte slice and return a Span/Trace ID. (#10487) - `elasticsearchreceiver`: Add integration test for elasticsearch receiver (#10165) ### 🧰 Bug fixes 🧰 diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 9be2f27eee96..c720b959a78d 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -17,11 +17,15 @@ in the OTLP protobuf definition. e.g., `status.code`, `attributes["http.method"] - Metric data types are `None`, `Gauge`, `Sum`, `Histogram`, `ExponentialHistogram`, and `Summary` - `aggregation_temporality` is converted to and from the [protobuf's numeric definition](https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto#L291). Interact with this field using 0, 1, or 2. - Until the grammar can handle booleans, `is_monotic` is handled via strings the strings `"true"` and `"false"`. -- Literals: Strings, ints, and floats can be referenced as literal values +- Literals: Strings, ints, and floats can be referenced as literal values. Byte slices can be references as a literal value via a hex string prefaced with `0x`, such as `0x0001`. - Function invocations: Functions can be invoked with arguments matching the function's expected arguments - Where clause: Telemetry to modify can be filtered by appending `where a b`, with `a` and `b` being any of the above. Supported functions: +- `SpanID(bytes)` - `bytes` is a byte slice of exactly 8 bytes. The function returns a SpanID from `bytes`. e.g., `SpanID(0x0000000000000000)` + +- `TraceID(bytes)` - `bytes` is a byte slice of exactly 16 bytes. The function returns a TraceID from `bytes`. e.g., `TraceID(0x00000000000000000000000000000000)` + - `set(target, value)` - `target` is a path expression to a telemetry field to set `value` into. `value` is any value type. e.g., `set(attributes["http.path"], "/foo")`, `set(name, attributes["http.route"])`. If `value` resolves to `nil`, e.g. it references an unset map value, there will be no action. diff --git a/processor/transformprocessor/internal/common/expression.go b/processor/transformprocessor/internal/common/expression.go index 245b81117411..f3583b078c97 100644 --- a/processor/transformprocessor/internal/common/expression.go +++ b/processor/transformprocessor/internal/common/expression.go @@ -67,6 +67,9 @@ func NewGetter(val Value, functions map[string]interface{}, pathParser PathExpre if i := val.Int; i != nil { return &literal{value: *i}, nil } + if b := val.Bytes; b != nil { + return &literal{value: ([]byte)(*b)}, nil + } if val.Path != nil { return pathParser(val.Path) diff --git a/processor/transformprocessor/internal/common/expression_test.go b/processor/transformprocessor/internal/common/expression_test.go index 875f6db1e0c4..aaa9228e283c 100644 --- a/processor/transformprocessor/internal/common/expression_test.go +++ b/processor/transformprocessor/internal/common/expression_test.go @@ -55,6 +55,13 @@ func Test_newGetter(t *testing.T) { }, want: int64(12), }, + { + name: "bytes literal", + val: Value{ + Bytes: (*Bytes)(&[]byte{1, 2, 3, 4, 5, 6, 7, 8}), + }, + want: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + }, { name: "path expression", val: Value{ diff --git a/processor/transformprocessor/internal/common/func_span_id.go b/processor/transformprocessor/internal/common/func_span_id.go new file mode 100644 index 000000000000..2c03feabd823 --- /dev/null +++ b/processor/transformprocessor/internal/common/func_span_id.go @@ -0,0 +1,33 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func spanID(bytes []byte) (ExprFunc, error) { + if len(bytes) != 8 { + return nil, errors.New("span ids must be 8 bytes") + } + var idArr [8]byte + copy(idArr[:8], bytes) + id := pcommon.NewSpanID(idArr) + return func(ctx TransformContext) interface{} { + return id + }, nil +} diff --git a/processor/transformprocessor/internal/common/func_span_id_test.go b/processor/transformprocessor/internal/common/func_span_id_test.go new file mode 100644 index 000000000000..7f0f7785ef3a --- /dev/null +++ b/processor/transformprocessor/internal/common/func_span_id_test.go @@ -0,0 +1,71 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common/testhelper" +) + +func Test_spanID(t *testing.T) { + tests := []struct { + name string + bytes []byte + want pcommon.SpanID + }{ + { + name: "create span id", + bytes: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + want: pcommon.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + ctx := testhelper.TestTransformContext{} + + exprFunc, _ := spanID(tt.bytes) + actual := exprFunc(ctx) + + assert.Equal(t, tt.want, actual) + }) + } +} + +func Test_spanID_validation(t *testing.T) { + tests := []struct { + name string + bytes []byte + }{ + { + name: "byte slice less than 8", + bytes: []byte{1, 2, 3, 4, 5, 6, 7}, + }, + { + name: "byte slice longer than 8", + bytes: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := traceID(tt.bytes) + assert.Error(t, err, "span ids must be 8 bytes") + }) + } +} diff --git a/processor/transformprocessor/internal/common/func_trace_id.go b/processor/transformprocessor/internal/common/func_trace_id.go new file mode 100644 index 000000000000..933b50860ea3 --- /dev/null +++ b/processor/transformprocessor/internal/common/func_trace_id.go @@ -0,0 +1,33 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func traceID(bytes []byte) (ExprFunc, error) { + if len(bytes) != 16 { + return nil, errors.New("traces ids must be 16 bytes") + } + var idArr [16]byte + copy(idArr[:16], bytes) + id := pcommon.NewTraceID(idArr) + return func(ctx TransformContext) interface{} { + return id + }, nil +} diff --git a/processor/transformprocessor/internal/common/func_trace_id_test.go b/processor/transformprocessor/internal/common/func_trace_id_test.go new file mode 100644 index 000000000000..cf85776c7d2a --- /dev/null +++ b/processor/transformprocessor/internal/common/func_trace_id_test.go @@ -0,0 +1,71 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common/testhelper" +) + +func Test_traceID(t *testing.T) { + tests := []struct { + name string + bytes []byte + want pcommon.TraceID + }{ + { + name: "create trace id", + bytes: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + want: pcommon.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + ctx := testhelper.TestTransformContext{} + + exprFunc, _ := traceID(tt.bytes) + actual := exprFunc(ctx) + + assert.Equal(t, tt.want, actual) + }) + } +} + +func Test_traceID_validation(t *testing.T) { + tests := []struct { + name string + bytes []byte + }{ + { + name: "byte slice less than 16", + bytes: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + }, + { + name: "byte slice longer than 16", + bytes: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := traceID(tt.bytes) + assert.Error(t, err, "traces ids must be 16 bytes") + }) + } +} diff --git a/processor/transformprocessor/internal/common/functions.go b/processor/transformprocessor/internal/common/functions.go index 1bfbc377b8d5..4984d5058370 100644 --- a/processor/transformprocessor/internal/common/functions.go +++ b/processor/transformprocessor/internal/common/functions.go @@ -20,6 +20,8 @@ import ( ) var registry = map[string]interface{}{ + "TraceID": traceID, + "SpanID": spanID, "keep_keys": keepKeys, "set": set, "truncate_all": truncateAll, @@ -109,6 +111,11 @@ func buildSliceArg(inv Invocation, argType reflect.Type, startingIndex int, args arg = append(arg, *inv.Arguments[j].Int) } *args = append(*args, reflect.ValueOf(arg)) + case reflect.Uint8: + if inv.Arguments[startingIndex].Bytes == nil { + return fmt.Errorf("invalid argument for slice parameter at position %v, must be a byte slice literal", startingIndex) + } + *args = append(*args, reflect.ValueOf(([]byte)(*inv.Arguments[startingIndex].Bytes))) default: return fmt.Errorf("unsupported slice type for function %v", inv.Function) } diff --git a/processor/transformprocessor/internal/common/functions_test.go b/processor/transformprocessor/internal/common/functions_test.go index 652668d91627..78fe55990749 100644 --- a/processor/transformprocessor/internal/common/functions_test.go +++ b/processor/transformprocessor/internal/common/functions_test.go @@ -30,6 +30,7 @@ func Test_NewFunctionCall_invalid(t *testing.T) { functions["testing_getter"] = functionWithGetter functions["testing_multiple_args"] = functionWithMultipleArgs functions["testing_string"] = functionWithString + functions["testing_byte_slice"] = functionWithByteSlice tests := []struct { name string @@ -97,6 +98,23 @@ func Test_NewFunctionCall_invalid(t *testing.T) { }, }, }, + { + name: "not matching arg type when byte slice", + inv: Invocation{ + Function: "testing_byte_slice", + Arguments: []Value{ + { + String: testhelper.Strp("test"), + }, + { + String: testhelper.Strp("test"), + }, + { + String: testhelper.Strp("test"), + }, + }, + }, + }, { name: "function call returns error", inv: Invocation{ @@ -117,6 +135,7 @@ func Test_NewFunctionCall(t *testing.T) { functions["testing_string_slice"] = functionWithStringSlice functions["testing_float_slice"] = functionWithFloatSlice functions["testing_int_slice"] = functionWithIntSlice + functions["testing_byte_slice"] = functionWithByteSlice functions["testing_setter"] = functionWithSetter functions["testing_getsetter"] = functionWithGetSetter functions["testing_getter"] = functionWithGetter @@ -276,6 +295,17 @@ func Test_NewFunctionCall(t *testing.T) { }, }, }, + { + name: "bytes arg", + inv: Invocation{ + Function: "testing_byte_slice", + Arguments: []Value{ + { + Bytes: (*Bytes)(&[]byte{1, 2, 3, 4, 5, 6, 7, 8}), + }, + }, + }, + }, { name: "multiple args", inv: Invocation{ @@ -335,6 +365,12 @@ func functionWithIntSlice(_ []int64) (ExprFunc, error) { }, nil } +func functionWithByteSlice(_ []byte) (ExprFunc, error) { + return func(ctx TransformContext) interface{} { + return "anything" + }, nil +} + func functionWithSetter(_ Setter) (ExprFunc, error) { return func(ctx TransformContext) interface{} { return "anything" diff --git a/processor/transformprocessor/internal/common/parser.go b/processor/transformprocessor/internal/common/parser.go index 7bc3ed109f26..a162fa906a9b 100644 --- a/processor/transformprocessor/internal/common/parser.go +++ b/processor/transformprocessor/internal/common/parser.go @@ -15,20 +15,15 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" import ( + "encoding/hex" + "errors" + "github.com/alecthomas/participle/v2" "github.com/alecthomas/participle/v2/lexer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.uber.org/multierr" ) -// Type for capturing booleans, see: -// https://github.com/alecthomas/participle#capturing-boolean-value -type Boolean bool - -func (b *Boolean) Capture(values []string) error { - *b = values[0] == "true" - return nil -} - // ParsedQuery represents a parsed query. It is the entry point into the query DSL. // nolint:govet type ParsedQuery struct { @@ -56,6 +51,7 @@ type Invocation struct { // nolint:govet type Value struct { Invocation *Invocation `( @@` + Bytes *Bytes `| @Bytes` String *string `| @String` Float *float64 `| @Float` Int *int64 `| @Int` @@ -83,6 +79,28 @@ type Query struct { Condition condFunc } +// Bytes type for capturing byte arrays +type Bytes []byte + +func (b *Bytes) Capture(values []string) error { + rawStr := values[0][2:] + bytes, err := hex.DecodeString(rawStr) + if err != nil { + return err + } + *b = bytes + return nil +} + +// Boolean Type for capturing booleans, see: +// https://github.com/alecthomas/participle#capturing-boolean-value +type Boolean bool + +func (b *Boolean) Capture(values []string) error { + *b = values[0] == "true" + return nil +} + func ParseQueries(statements []string, functions map[string]interface{}, pathParser PathExpressionParser) ([]Query, error) { queries := make([]Query, 0) var errors error @@ -131,6 +149,7 @@ func parseQuery(raw string) (*ParsedQuery, error) { func newParser() *participle.Parser { lex := lexer.MustSimple([]lexer.SimpleRule{ {Name: `Ident`, Pattern: `[a-zA-Z_][a-zA-Z0-9_]*`}, + {Name: `Bytes`, Pattern: `0x[a-fA-F0-9]+`}, {Name: `Float`, Pattern: `[-+]?\d*\.\d+([eE][-+]?\d+)?`}, {Name: `Int`, Pattern: `[-+]?\d+`}, {Name: `String`, Pattern: `"(\\"|[^"])*"`}, @@ -147,3 +166,29 @@ func newParser() *participle.Parser { } return parser } + +func ParseSpanID(spanIDStr string) (pcommon.SpanID, error) { + id, err := hex.DecodeString(spanIDStr) + if err != nil { + return pcommon.SpanID{}, err + } + if len(id) != 8 { + return pcommon.SpanID{}, errors.New("span ids must be 8 bytes") + } + var idArr [8]byte + copy(idArr[:8], id) + return pcommon.NewSpanID(idArr), nil +} + +func ParseTraceID(traceIDStr string) (pcommon.TraceID, error) { + id, err := hex.DecodeString(traceIDStr) + if err != nil { + return pcommon.TraceID{}, err + } + if len(id) != 16 { + return pcommon.TraceID{}, errors.New("traces ids must be 16 bytes") + } + var idArr [16]byte + copy(idArr[:16], id) + return pcommon.NewTraceID(idArr), nil +} diff --git a/processor/transformprocessor/internal/common/parser_test.go b/processor/transformprocessor/internal/common/parser_test.go index 127b7b553673..e566afeaebe5 100644 --- a/processor/transformprocessor/internal/common/parser_test.go +++ b/processor/transformprocessor/internal/common/parser_test.go @@ -313,6 +313,30 @@ func Test_parse(t *testing.T) { Condition: nil, }, }, + { + query: `set(attributes["bytes"], 0x0102030405060708)`, + expected: &ParsedQuery{ + Invocation: Invocation{ + Function: "set", + Arguments: []Value{ + { + Path: &Path{ + Fields: []Field{ + { + Name: "attributes", + MapKey: testhelper.Strp("bytes"), + }, + }, + }, + }, + { + Bytes: (*Bytes)(&[]byte{1, 2, 3, 4, 5, 6, 7, 8}), + }, + }, + }, + Condition: nil, + }, + }, } for _, tt := range tests { @@ -331,6 +355,12 @@ func Test_parse_failure(t *testing.T) { `set(name.)`, `("foo")`, `set("foo") where name =||= "fido"`, + `set(span_id, SpanIDWrapper{not a hex string})`, + `set(span_id, SpanIDWrapper{01})`, + `set(span_id, SpanIDWrapper{010203040506070809})`, + `set(trace_id, TraceIDWrapper{not a hex string})`, + `set(trace_id, TraceIDWrapper{0102030405060708090a0b0c0d0e0f})`, + `set(trace_id, TraceIDWrapper{0102030405060708090a0b0c0d0e0f1011})`, } for _, tt := range tests { t.Run(tt, func(t *testing.T) { diff --git a/processor/transformprocessor/internal/logs/logs.go b/processor/transformprocessor/internal/logs/logs.go index 1dcd9b5ae083..b49fe5eca9f8 100644 --- a/processor/transformprocessor/internal/logs/logs.go +++ b/processor/transformprocessor/internal/logs/logs.go @@ -16,7 +16,6 @@ package logs // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs" import ( - "encoding/hex" "fmt" "time" @@ -317,10 +316,9 @@ func accessTraceID() pathGetSetter { }, setter: func(ctx common.TransformContext, val interface{}) { if str, ok := val.(string); ok { - id, _ := hex.DecodeString(str) - var idArr [16]byte - copy(idArr[:16], id) - ctx.GetItem().(plog.LogRecord).SetTraceID(pcommon.NewTraceID(idArr)) + if traceID, err := common.ParseTraceID(str); err == nil { + ctx.GetItem().(plog.LogRecord).SetTraceID(traceID) + } } }, } @@ -333,10 +331,9 @@ func accessSpanID() pathGetSetter { }, setter: func(ctx common.TransformContext, val interface{}) { if str, ok := val.(string); ok { - id, _ := hex.DecodeString(str) - var idArr [8]byte - copy(idArr[:8], id) - ctx.GetItem().(plog.LogRecord).SetSpanID(pcommon.NewSpanID(idArr)) + if spanID, err := common.ParseSpanID(str); err == nil { + ctx.GetItem().(plog.LogRecord).SetSpanID(spanID) + } } }, } diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index 88024654a2d1..b3de0a0c67bc 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -65,6 +65,18 @@ func TestProcess(t *testing.T) { td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Status().SetCode(ptrace.StatusCodeOk) }, }, + { + query: `set(attributes["test"], "pass") where trace_id == TraceID(0x01000000000000000000000000000000)`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().InsertString("test", "pass") + }, + }, + { + query: `set(attributes["test"], "pass") where span_id == SpanID(0x0100000000000000)`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().InsertString("test", "pass") + }, + }, } for _, tt := range tests { @@ -196,6 +208,8 @@ func constructTracesNum(num int) ptrace.Traces { func fillSpanOne(span ptrace.Span) { span.SetName("operationA") + span.SetSpanID(pcommon.NewSpanID([8]byte{1, 0, 0, 0, 0, 0, 0, 0})) + span.SetTraceID(pcommon.NewTraceID([16]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})) span.SetStartTimestamp(TestSpanStartTimestamp) span.SetEndTimestamp(TestSpanEndTimestamp) span.SetDroppedAttributesCount(1) diff --git a/processor/transformprocessor/internal/traces/traces.go b/processor/transformprocessor/internal/traces/traces.go index 045be9527720..e890cf342820 100644 --- a/processor/transformprocessor/internal/traces/traces.go +++ b/processor/transformprocessor/internal/traces/traces.go @@ -223,10 +223,9 @@ func accessTraceID() pathGetSetter { }, setter: func(ctx common.TransformContext, val interface{}) { if str, ok := val.(string); ok { - id, _ := hex.DecodeString(str) - var idArr [16]byte - copy(idArr[:16], id) - ctx.GetItem().(ptrace.Span).SetTraceID(pcommon.NewTraceID(idArr)) + if traceID, err := common.ParseTraceID(str); err == nil { + ctx.GetItem().(ptrace.Span).SetTraceID(traceID) + } } }, } @@ -239,10 +238,9 @@ func accessSpanID() pathGetSetter { }, setter: func(ctx common.TransformContext, val interface{}) { if str, ok := val.(string); ok { - id, _ := hex.DecodeString(str) - var idArr [8]byte - copy(idArr[:8], id) - ctx.GetItem().(ptrace.Span).SetSpanID(pcommon.NewSpanID(idArr)) + if spanID, err := common.ParseSpanID(str); err == nil { + ctx.GetItem().(ptrace.Span).SetSpanID(spanID) + } } }, }