Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add transition from instrumentationlibrary -> scope #5085

Merged
merged 6 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
In case of type mismatch, they don't panic right away but return an invalid zero-initialized
instance for consistency with other OneOf field accessors (#5034)
- Update OTLP to v0.15.0 (#5064)
- Adding support for transition from older versions of OTLP to OTLP v0.15.0 (#5085)

### 🧰 Bug fixes 🧰

Expand Down
4 changes: 4 additions & 0 deletions model/otlp/json_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
ipdata "go.opentelemetry.io/collector/model/internal/pdata"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -54,6 +55,7 @@ func (d *jsonUnmarshaler) UnmarshalLogs(buf []byte) (pdata.Logs, error) {
if err := d.delegate.Unmarshal(bytes.NewReader(buf), ld); err != nil {
return pdata.Logs{}, err
}
otlpgrpc.InstrumentationLibraryLogsToScope(ld.ResourceLogs)
return ipdata.LogsFromOtlp(ld), nil
}

Expand All @@ -62,6 +64,7 @@ func (d *jsonUnmarshaler) UnmarshalMetrics(buf []byte) (pdata.Metrics, error) {
if err := d.delegate.Unmarshal(bytes.NewReader(buf), md); err != nil {
return pdata.Metrics{}, err
}
otlpgrpc.InstrumentationLibraryMetricsToScope(md.ResourceMetrics)
return ipdata.MetricsFromOtlp(md), nil
}

Expand All @@ -70,5 +73,6 @@ func (d *jsonUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) {
if err := d.delegate.Unmarshal(bytes.NewReader(buf), td); err != nil {
return pdata.Traces{}, err
}
otlpgrpc.InstrumentationLibrarySpansToScope(td.ResourceSpans)
return ipdata.TracesFromOtlp(td), nil
}
39 changes: 37 additions & 2 deletions model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc"

otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
v1 "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
ipdata "go.opentelemetry.io/collector/model/internal/pdata"
"go.opentelemetry.io/collector/model/pdata"
Expand Down Expand Up @@ -119,7 +120,11 @@ func (lr LogsRequest) MarshalProto() ([]byte, error) {

// UnmarshalProto unmarshalls LogsRequest from proto bytes.
func (lr LogsRequest) UnmarshalProto(data []byte) error {
return lr.orig.Unmarshal(data)
if err := lr.orig.Unmarshal(data); err != nil {
return err
}
InstrumentationLibraryLogsToScope(lr.orig.ResourceLogs)
return nil
}

// MarshalJSON marshals LogsRequest into JSON bytes.
Expand All @@ -133,7 +138,11 @@ func (lr LogsRequest) MarshalJSON() ([]byte, error) {

// UnmarshalJSON unmarshalls LogsRequest from JSON bytes.
func (lr LogsRequest) UnmarshalJSON(data []byte) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to your change: looks like for some reason we now have 2 places in the codebase to unmarshal OTLP JSON. I am not sure if this duplication is intentional.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intentional because we have logically 2 "encodings":

  1. For Request/Response for grpc/http
  2. For persistent storage like Kafka/File etc. See proto Request message vs TracesData.

return jsonUnmarshaler.Unmarshal(bytes.NewReader(data), lr.orig)
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), lr.orig); err != nil {
return err
}
InstrumentationLibraryLogsToScope(lr.orig.ResourceLogs)
return nil
}

func (lr LogsRequest) SetLogs(ld pdata.Logs) {
Expand Down Expand Up @@ -191,3 +200,29 @@ func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlog.Exp
rsp, err := s.srv.Export(ctx, LogsRequest{orig: request})
return rsp.orig, err
}

// InstrumentationLibraryLogsToScope implements the translation of resource logs data
// following the v0.15.0 upgrade:
// receivers SHOULD check if instrumentation_library_logs is set
// and scope_logs is not set then the value in instrumentation_library_logs
// SHOULD be used instead by converting InstrumentationLibraryLogs into ScopeLogs.
// If scope_logs is set then instrumentation_library_logs SHOULD be ignored.
// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/logs/v1/logs.proto#L58
func InstrumentationLibraryLogsToScope(rls []*otlplogs.ResourceLogs) {
for _, rl := range rls {
if len(rl.ScopeLogs) == 0 {
for _, ill := range rl.InstrumentationLibraryLogs {
scopeLogs := otlplogs.ScopeLogs{
Scope: v1.InstrumentationScope{
Name: ill.InstrumentationLibrary.Name,
Version: ill.InstrumentationLibrary.Version,
},
LogRecords: ill.LogRecords,
SchemaUrl: ill.SchemaUrl,
}
rl.ScopeLogs = append(rl.ScopeLogs, &scopeLogs)
}
}
rl.InstrumentationLibraryLogs = nil
}
}
149 changes: 134 additions & 15 deletions model/otlpgrpc/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

v1 "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
"go.opentelemetry.io/collector/model/internal/pdata"
)

Expand All @@ -42,27 +43,88 @@ var _ json.Marshaler = LogsRequest{}

var logsRequestJSON = []byte(`
{
"resourceLogs": [
"resourceLogs": [
{
"resource": {},
"scopeLogs": [
{
"scope": {},
"logRecords": [
"resource": {},
"scopeLogs": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
"scope": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
]
}
]
]
}
]
]
}`)

var logsTransitionData = [][]byte{
[]byte(`
{
"resourceLogs": [
{
"resource": {},
"instrumentationLibraryLogs": [
{
"instrumentationLibrary": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
]
}
]
}`),
[]byte(`
{
"resourceLogs": [
{
"resource": {},
"instrumentationLibraryLogs": [
{
"instrumentationLibrary": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
],
"scopeLogs": [
{
"scope": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
]
}
]
}`),
}

func TestLogsRequestJSON(t *testing.T) {
lr := NewLogsRequest()
assert.NoError(t, lr.UnmarshalJSON(logsRequestJSON))
Expand All @@ -73,6 +135,18 @@ func TestLogsRequestJSON(t *testing.T) {
assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got))
}

func TestLogsRequestJSONTransition(t *testing.T) {
for _, data := range logsTransitionData {
lr := NewLogsRequest()
assert.NoError(t, lr.UnmarshalJSON(data))
assert.Equal(t, "test_log_record", lr.Logs().ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString())

got, err := lr.MarshalJSON()
assert.NoError(t, err)
assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got))
}
}

func TestLogsRequestJSON_Deprecated(t *testing.T) {
lr, err := UnmarshalJSONLogsRequest(logsRequestJSON)
assert.NoError(t, err)
Expand Down Expand Up @@ -116,6 +190,41 @@ func TestLogsGrpc(t *testing.T) {
assert.Equal(t, NewLogsResponse(), resp)
}

func TestLogsGrpcTransition(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
RegisterLogsServer(s, &fakeLogsServer{t: t})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, s.Serve(lis))
}()
t.Cleanup(func() {
s.Stop()
wg.Wait()
})

cc, err := grpc.Dial("bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock())
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, cc.Close())
})

logClient := NewLogsClient(cc)

req := generateLogsRequestWithInstrumentationLibrary()
InstrumentationLibraryLogsToScope(req.orig.ResourceLogs)
resp, err := logClient.Export(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, NewLogsResponse(), resp)
}

func TestLogsGrpcError(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
Expand Down Expand Up @@ -170,3 +279,13 @@ func generateLogsRequest() LogsRequest {
lr.SetLogs(ld)
return lr
}

func generateLogsRequestWithInstrumentationLibrary() LogsRequest {
lr := generateLogsRequest()
lr.orig.ResourceLogs[0].InstrumentationLibraryLogs = []*v1.InstrumentationLibraryLogs{ //nolint:staticcheck // SA1019 ignore this!
{
LogRecords: lr.orig.ResourceLogs[0].ScopeLogs[0].LogRecords,
},
}
return lr
}
33 changes: 32 additions & 1 deletion model/otlpgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/grpc"

otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
v1 "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
ipdata "go.opentelemetry.io/collector/model/internal/pdata"
"go.opentelemetry.io/collector/model/pdata"
Expand Down Expand Up @@ -129,7 +130,11 @@ func (mr MetricsRequest) MarshalJSON() ([]byte, error) {

// UnmarshalJSON unmarshalls MetricsRequest from JSON bytes.
func (mr MetricsRequest) UnmarshalJSON(data []byte) error {
return jsonUnmarshaler.Unmarshal(bytes.NewReader(data), mr.orig)
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), mr.orig); err != nil {
return err
}
InstrumentationLibraryMetricsToScope(mr.orig.ResourceMetrics)
return nil
}

func (mr MetricsRequest) SetMetrics(ld pdata.Metrics) {
Expand Down Expand Up @@ -187,3 +192,29 @@ func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetr
rsp, err := s.srv.Export(ctx, MetricsRequest{orig: request})
return rsp.orig, err
}

// InstrumentationLibraryMetricsToScope implements the translation of resource metrics data
// following the v0.15.0 upgrade:
// receivers SHOULD check if instrumentation_library_metrics is set
// and scope_metrics is not set then the value in instrumentation_library_metrics
// SHOULD be used instead by converting InstrumentationLibraryMetrics into ScopeMetrics.
// If scope_metrics is set then instrumentation_library_metrics SHOULD be ignored.
// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/metrics/v1/metrics.proto#L58
func InstrumentationLibraryMetricsToScope(rms []*otlpmetrics.ResourceMetrics) {
for _, rm := range rms {
if len(rm.ScopeMetrics) == 0 {
for _, ilm := range rm.InstrumentationLibraryMetrics {
scopeMetrics := otlpmetrics.ScopeMetrics{
Scope: v1.InstrumentationScope{
Name: ilm.InstrumentationLibrary.Name,
Version: ilm.InstrumentationLibrary.Version,
},
Metrics: ilm.Metrics,
SchemaUrl: ilm.SchemaUrl,
}
rm.ScopeMetrics = append(rm.ScopeMetrics, &scopeMetrics)
}
}
rm.InstrumentationLibraryMetrics = nil
}
}
Loading