diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a8adaf8409..7c19463dcb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ In case of type mismatch, they don't panic right away but return an invalid zero-initialized instance for consistency with other OneOf field accessors (#5034) - Update OTLP to v0.15.0 (#5064) +- Add `jsoniter` Unmarshaller (#4817) ### 🧰 Bug fixes 🧰 diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index 06f4ffed819..e92dbf9c88d 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -29,6 +29,7 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/knadh/koanf v1.4.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect @@ -37,6 +38,8 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.1.16 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/go.mod b/go.mod index 00fcc40a556..f1a4b66d767 100644 --- a/go.mod +++ b/go.mod @@ -55,10 +55,13 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.12.1 // indirect diff --git a/go.sum b/go.sum index 36b0ee376d8..53614f36dd2 100644 --- a/go.sum +++ b/go.sum @@ -229,6 +229,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -276,9 +277,11 @@ github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mostynb/go-grpc-compression v1.1.16 h1:D9tGUINmcII049pxOj9dl32Fzhp26TrDVQXECoKJqQg= github.com/mostynb/go-grpc-compression v1.1.16/go.mod h1:xxa6UoYynYS2h+5HB/Hglu81iYAp87ARaNmhhwi0s1s= diff --git a/model/go.mod b/model/go.mod index 6497c4ab029..7371a3dff8b 100644 --- a/model/go.mod +++ b/model/go.mod @@ -4,14 +4,17 @@ go 1.17 require ( github.com/gogo/protobuf v1.3.2 + github.com/json-iterator/go v1.1.12 github.com/stretchr/testify v1.7.1 google.golang.org/grpc v1.45.0 google.golang.org/protobuf v1.27.1 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f // indirect diff --git a/model/go.sum b/model/go.sum index 62fd17b270a..e423d83d283 100644 --- a/model/go.sum +++ b/model/go.sum @@ -11,8 +11,9 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -45,15 +46,23 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= diff --git a/model/otlp/jsoniter_unmarshaler.go b/model/otlp/jsoniter_unmarshaler.go new file mode 100644 index 00000000000..f4829f37337 --- /dev/null +++ b/model/otlp/jsoniter_unmarshaler.go @@ -0,0 +1,393 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp // import "go.opentelemetry.io/collector/model/otlp" + +import ( + "encoding/base64" + "errors" + "fmt" + + jsoniter "github.com/json-iterator/go" + + otlpcommon "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1" + otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1" + otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1" + otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1" + ipdata "go.opentelemetry.io/collector/model/internal/pdata" + "go.opentelemetry.io/collector/model/pdata" +) + +// NewJSONIterTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshals from OTLP json bytes. +func NewJSONIterTracesUnmarshaler() pdata.TracesUnmarshaler { + return newJSONIterUnmarshaler() +} + +// NewJSONIterMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshals from OTLP json bytes. +func NewJSONIterMetricsUnmarshaler() pdata.MetricsUnmarshaler { + return newJSONIterUnmarshaler() +} + +// NewJSONIterLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshals from OTLP json bytes. +func NewJSONIterLogsUnmarshaler() pdata.LogsUnmarshaler { + return newJSONIterUnmarshaler() +} + +type jsonIterUnmarshaler struct { +} + +func newJSONIterUnmarshaler() *jsonIterUnmarshaler { + return &jsonIterUnmarshaler{} +} + +func (d *jsonIterUnmarshaler) UnmarshalLogs(buf []byte) (pdata.Logs, error) { + ld := &otlplogs.LogsData{} + return ipdata.LogsFromOtlp(ld), errors.New("unimplemented") +} + +func (d *jsonIterUnmarshaler) UnmarshalMetrics(buf []byte) (pdata.Metrics, error) { + md := &otlpmetrics.MetricsData{} + return ipdata.MetricsFromOtlp(md), errors.New("unimplemented") +} + +func (d *jsonIterUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) { + iter := jsoniter.ConfigFastest.BorrowIterator(buf) + td := readTraceData(iter) + err := iter.Error + jsoniter.ConfigFastest.ReturnIterator(iter) + return ipdata.TracesFromOtlp(td), err +} + +func readTraceData(iter *jsoniter.Iterator) *otlptrace.TracesData { + td := &otlptrace.TracesData{} + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "resourceSpans", "resource_spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + td.ResourceSpans = append(td.ResourceSpans, readResourceSpans(iter)) + return true + }) + default: + iter.ReportError("root", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return td +} + +func readResourceSpans(iter *jsoniter.Iterator) *otlptrace.ResourceSpans { + rs := &otlptrace.ResourceSpans{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "resource": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + rs.Resource.Attributes = append(rs.Resource.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + rs.Resource.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readResourceSpans.resource", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + case "instrumentationLibrarySpans", "instrumentation_library_spans", "scopeSpans", "scope_spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + rs.ScopeSpans = append(rs.ScopeSpans, + readInstrumentationLibrarySpans(iter)) + return true + }) + case "schemaUrl", "schema_url": + rs.SchemaUrl = iter.ReadString() + default: + iter.ReportError("readResourceSpans", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return rs +} + +func readInstrumentationLibrarySpans(iter *jsoniter.Iterator) *otlptrace.ScopeSpans { + ils := &otlptrace.ScopeSpans{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "instrumentationLibrary", "instrumentation_library", "scope": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "name": + ils.Scope.Name = iter.ReadString() + case "version": + ils.Scope.Version = iter.ReadString() + default: + iter.ReportError("readInstrumentationLibrarySpans.instrumentationLibrary", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + case "spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + ils.Spans = append(ils.Spans, readSpan(iter)) + return true + }) + case "schemaUrl", "schema_url": + ils.SchemaUrl = iter.ReadString() + default: + iter.ReportError("readInstrumentationLibrarySpans", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return ils +} + +func readSpan(iter *jsoniter.Iterator) *otlptrace.Span { + sp := &otlptrace.Span{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "traceId", "trace_id": + if err := sp.TraceId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.traceId", fmt.Sprintf("parse trace_id:%v", err)) + } + case "spanId", "span_id": + if err := sp.SpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.spanId", fmt.Sprintf("parse span_id:%v", err)) + } + case "traceState", "trace_state": + sp.TraceState = iter.ReadString() + case "parentSpanId", "parent_span_id": + if err := sp.ParentSpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.parentSpanId", fmt.Sprintf("parse parent_span_id:%v", err)) + } + case "name": + sp.Name = iter.ReadString() + case "kind": + sp.Kind = readSpanKind(iter) + case "startTimeUnixNano", "start_time_unix_nano": + sp.StartTimeUnixNano = uint64(readInt64(iter)) + case "endTimeUnixNano", "end_time_unix_nano": + sp.EndTimeUnixNano = uint64(readInt64(iter)) + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Attributes = append(sp.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + sp.DroppedAttributesCount = iter.ReadUint32() + case "events": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Events = append(sp.Events, readSpanEvent(iter)) + return true + }) + case "droppedEventsCount", "dropped_events_count": + sp.DroppedEventsCount = iter.ReadUint32() + case "links": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Links = append(sp.Links, readSpanLink(iter)) + return true + }) + case "droppedLinksCount", "dropped_links_count": + sp.DroppedLinksCount = iter.ReadUint32() + case "status": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "message": + sp.Status.Message = iter.ReadString() + case "code": + sp.Status.Code = readStatusCode(iter) + default: + iter.ReportError("readSpan.status", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + default: + iter.ReportError("readSpan", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return sp +} + +func readSpanLink(iter *jsoniter.Iterator) *otlptrace.Span_Link { + link := &otlptrace.Span_Link{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "traceId", "trace_id": + if err := link.TraceId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpanLink", fmt.Sprintf("parse trace_id:%v", err)) + } + case "spanId", "span_id": + if err := link.SpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpanLink", fmt.Sprintf("parse span_id:%v", err)) + } + case "traceState", "trace_state": + link.TraceState = iter.ReadString() + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + link.Attributes = append(link.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + link.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readSpanLink", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return link +} + +func readSpanEvent(iter *jsoniter.Iterator) *otlptrace.Span_Event { + event := &otlptrace.Span_Event{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "timeUnixNano", "time_unix_nano": + event.TimeUnixNano = uint64(readInt64(iter)) + case "name": + event.Name = iter.ReadString() + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + event.Attributes = append(event.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + event.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readSpanEvent", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return event +} + +func readAttribute(iter *jsoniter.Iterator) otlpcommon.KeyValue { + var ( + key string + value otlpcommon.AnyValue + ) + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "key": + key = iter.ReadString() + case "value": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + value = readAnyValue(iter, f) + return true + }) + default: + iter.ReportError("readAttribute", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return otlpcommon.KeyValue{ + Key: key, + Value: value, + } +} + +func readAnyValue(iter *jsoniter.Iterator, f string) otlpcommon.AnyValue { + switch f { + case "stringValue", "string_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: iter.ReadString(), + }, + } + case "boolValue", "bool_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_BoolValue{ + BoolValue: iter.ReadBool(), + }, + } + case "intValue", "int_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_IntValue{ + IntValue: readInt64(iter), + }, + } + case "doubleValue", "double_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_DoubleValue{ + DoubleValue: iter.ReadFloat64(), + }, + } + case "bytesValue", "bytes_value": + v, err := base64.StdEncoding.DecodeString(iter.ReadString()) + if err != nil { + iter.ReportError("bytesValue", fmt.Sprintf("base64 decode:%v", err)) + return otlpcommon.AnyValue{} + } + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_BytesValue{ + BytesValue: v, + }, + } + case "arrayValue", "array_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_ArrayValue{ + ArrayValue: readArray(iter), + }, + } + default: + iter.ReportError("readAnyValue", fmt.Sprintf("unknown field:%v", f)) + return otlpcommon.AnyValue{} + } +} + +func readArray(iter *jsoniter.Iterator) *otlpcommon.ArrayValue { + v := &otlpcommon.ArrayValue{} + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "values": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + v.Values = append(v.Values, readAnyValue(iter, f)) + return true + }) + return true + }) + default: + iter.ReportError("readArray", fmt.Sprintf("unknown field:%s", f)) + } + return true + }) + return v +} + +func readInt64(iter *jsoniter.Iterator) int64 { + return iter.ReadAny().ToInt64() +} + +func readSpanKind(iter *jsoniter.Iterator) otlptrace.Span_SpanKind { + any := iter.ReadAny() + if v := any.ToInt(); v > 0 { + return otlptrace.Span_SpanKind(v) + } + v := any.ToString() + return otlptrace.Span_SpanKind(otlptrace.Span_SpanKind_value[v]) +} + +func readStatusCode(iter *jsoniter.Iterator) otlptrace.Status_StatusCode { + any := iter.ReadAny() + if v := any.ToInt(); v > 0 { + return otlptrace.Status_StatusCode(v) + } + v := any.ToString() + return otlptrace.Status_StatusCode(otlptrace.Status_StatusCode_value[v]) +} diff --git a/model/otlp/jsoniter_unmarshaler_test.go b/model/otlp/jsoniter_unmarshaler_test.go new file mode 100644 index 00000000000..be2fd01469a --- /dev/null +++ b/model/otlp/jsoniter_unmarshaler_test.go @@ -0,0 +1,421 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "fmt" + "testing" + "time" + + jsoniter "github.com/json-iterator/go" + "github.com/stretchr/testify/assert" + + v1 "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1" + "go.opentelemetry.io/collector/model/pdata" +) + +var tracesOTLPFull = func() pdata.Traces { + traceID := pdata.NewTraceID([16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}) + spanID := pdata.NewSpanID([8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}) + td := pdata.NewTraces() + // Add ResourceSpans. + rs := td.ResourceSpans().AppendEmpty() + rs.SetSchemaUrl("schemaURL") + // Add resource. + rs.Resource().Attributes().UpsertString("host.name", "testHost") + rs.Resource().Attributes().UpsertString("service.name", "testService") + rs.Resource().SetDroppedAttributesCount(1) + // Add InstrumentationLibrarySpans. + il := rs.InstrumentationLibrarySpans().AppendEmpty() + il.InstrumentationLibrary().SetName("instrumentation name") + il.InstrumentationLibrary().SetVersion("instrumentation version") + il.SetSchemaUrl("schemaURL") + // Add spans. + sp := il.Spans().AppendEmpty() + sp.SetName("testSpan") + sp.SetKind(pdata.SpanKindClient) + sp.SetDroppedAttributesCount(1) + sp.SetStartTimestamp(pdata.NewTimestampFromTime(time.Now())) + sp.SetTraceID(traceID) + sp.SetSpanID(spanID) + sp.SetDroppedEventsCount(1) + sp.SetDroppedLinksCount(1) + sp.SetEndTimestamp(pdata.NewTimestampFromTime(time.Now())) + sp.SetParentSpanID(spanID) + sp.SetTraceState("state") + sp.Status().SetCode(pdata.StatusCodeOk) + sp.Status().SetMessage("message") + // Add attributes. + sp.Attributes().UpsertString("string", "value") + sp.Attributes().UpsertBool("bool", true) + sp.Attributes().UpsertInt("int", 1) + sp.Attributes().UpsertDouble("double", 1.1) + sp.Attributes().UpsertBytes("bytes", []byte("foo")) + arr := pdata.NewValueSlice() + arr.SliceVal().AppendEmpty().SetIntVal(1) + sp.Attributes().Upsert("array", arr) + // Add events. + event := sp.Events().AppendEmpty() + event.SetName("eventName") + event.SetTimestamp(pdata.NewTimestampFromTime(time.Now())) + event.SetDroppedAttributesCount(1) + event.Attributes().UpsertString("string", "value") + event.Attributes().UpsertBool("bool", true) + event.Attributes().UpsertInt("int", 1) + event.Attributes().UpsertDouble("double", 1.1) + event.Attributes().UpsertBytes("bytes", []byte("foo")) + // Add links. + link := sp.Links().AppendEmpty() + link.SetTraceState("state") + link.SetTraceID(traceID) + link.SetSpanID(spanID) + link.SetDroppedAttributesCount(1) + link.Attributes().UpsertString("string", "value") + link.Attributes().UpsertBool("bool", true) + link.Attributes().UpsertInt("int", 1) + link.Attributes().UpsertDouble("double", 1.1) + link.Attributes().UpsertBytes("bytes", []byte("foo")) + // Add another span. + sp2 := il.Spans().AppendEmpty() + sp2.SetName("testSpan2") + return td +}() + +func TestLogsJSONIter(t *testing.T) { + encoder := NewJSONLogsMarshaler() + jsonBuf, err := encoder.MarshalLogs(logsOTLP) + assert.NoError(t, err) + + decoder := NewJSONIterLogsUnmarshaler() + _, err = decoder.UnmarshalLogs(jsonBuf) + assert.Error(t, err) +} + +func TestMetricsJSONIter(t *testing.T) { + encoder := NewJSONMetricsMarshaler() + jsonBuf, err := encoder.MarshalMetrics(metricsOTLP) + assert.NoError(t, err) + + decoder := NewJSONIterMetricsUnmarshaler() + _, err = decoder.UnmarshalMetrics(jsonBuf) + assert.Error(t, err) +} + +func TestTracesJSONIter(t *testing.T) { + encoder := NewJSONTracesMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(t, err) + + decoder := NewJSONIterTracesUnmarshaler() + got, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(t, err) + assert.EqualValues(t, tracesOTLPFull, got) +} + +func BenchmarkTracesJSONUnmarshal(b *testing.B) { + b.ReportAllocs() + + encoder := NewJSONTracesMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(b, err) + decoder := newJSONUnmarshaler() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(b, err) + } + }) +} + +func BenchmarkTracesJSONiterUnmarshal(b *testing.B) { + b.ReportAllocs() + + encoder := NewJSONTracesMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(b, err) + decoder := newJSONIterUnmarshaler() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(b, err) + } + }) +} + +func TestReadInt64(t *testing.T) { + var data = `{"intAsNumber":1,"intAsString":"1"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(data)) + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "intAsNumber": + v := readInt64(iter) + assert.Equal(t, int64(1), v) + case "intAsString": + v := readInt64(iter) + assert.Equal(t, int64(1), v) + } + return true + }) + assert.NoError(t, iter.Error) +} + +func Test_readTraceData(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readTraceData(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readResourceSpans(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readResourceSpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("unknown resource field", func(t *testing.T) { + jsonStr := `{"resource":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readResourceSpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readInstrumentationLibrarySpans(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readInstrumentationLibrarySpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("unknown instrumentationLibrary field", func(t *testing.T) { + jsonStr := `{"instrumentationLibrary":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readInstrumentationLibrarySpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readSpan(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("unknown status field", func(t *testing.T) { + jsonStr := `{"status":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("invalid trace_id field", func(t *testing.T) { + jsonStr := `{"trace_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse trace_id") + } + }) + t.Run("invalid span_id field", func(t *testing.T) { + jsonStr := `{"span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse span_id") + } + }) + t.Run("invalid parent_span_id field", func(t *testing.T) { + jsonStr := `{"parent_span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse parent_span_id") + } + }) +} + +func Test_readSpanLink(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("invalid trace_id field", func(t *testing.T) { + jsonStr := `{"trace_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse trace_id") + } + }) + t.Run("invalid span_id field", func(t *testing.T) { + jsonStr := `{"span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse span_id") + } + }) +} + +func Test_readSpanEvent(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanEvent(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readAttribute(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAttribute(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readAnyValue(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAnyValue(iter, "") + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("invalid bytesValue", func(t *testing.T) { + jsonStr := `"--"` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAnyValue(iter, "bytesValue") + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "base64") + } + }) +} + +func Test_readArray(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readArray(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readSpanKind(t *testing.T) { + tests := []struct { + name string + jsonStr string + want v1.Span_SpanKind + }{ + { + name: "string", + jsonStr: fmt.Sprintf(`"%s"`, v1.Span_SPAN_KIND_INTERNAL.String()), + want: v1.Span_SPAN_KIND_INTERNAL, + }, + { + name: "int", + jsonStr: fmt.Sprintf("%d", v1.Span_SPAN_KIND_INTERNAL), + want: v1.Span_SPAN_KIND_INTERNAL, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(tt.jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + if got := readSpanKind(iter); got != tt.want { + t.Errorf("readSpanKind() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_readStatusCode(t *testing.T) { + tests := []struct { + name string + jsonStr string + want v1.Status_StatusCode + }{ + { + name: "string", + jsonStr: fmt.Sprintf(`"%s"`, v1.Status_STATUS_CODE_ERROR.String()), + want: v1.Status_STATUS_CODE_ERROR, + }, + { + name: "int", + jsonStr: fmt.Sprintf("%d", v1.Status_STATUS_CODE_ERROR), + want: v1.Status_STATUS_CODE_ERROR, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(tt.jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + if got := readStatusCode(iter); got != tt.want { + t.Errorf("readStatusCode() = %v, want %v", got, tt.want) + } + }) + } +}