Skip to content

Commit

Permalink
add transition from instrumentationlibrary -> scope (#5085)
Browse files Browse the repository at this point in the history
* add transition from instrumentationlibrary -> scope

This is the last piece of upgrading to v0.15.0. This PR adds the tests and code to ensure the transition plan is implemented in the collector.

Fixes #5074

* fix spacing, add link

* update unmarshaler, adding tests

* update grpc tests

* update changelog

* Update model/otlpgrpc/logs.go

Co-authored-by: Tigran Najaryan <[email protected]>

Co-authored-by: Tigran Najaryan <[email protected]>
  • Loading branch information
codeboten and tigrannajaryan authored Mar 29, 2022
1 parent 55c365b commit 831373a
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
In case of type mismatch, they don't panic right away but return an invalid zero-initialized
instance for consistency with other OneOf field accessors (#5034)
- Update OTLP to v0.15.0 (#5064)
- Adding support for transition from older versions of OTLP to OTLP v0.15.0 (#5085)

### 🧰 Bug fixes 🧰

Expand Down
4 changes: 4 additions & 0 deletions model/otlp/json_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
ipdata "go.opentelemetry.io/collector/model/internal/pdata"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -54,6 +55,7 @@ func (d *jsonUnmarshaler) UnmarshalLogs(buf []byte) (pdata.Logs, error) {
if err := d.delegate.Unmarshal(bytes.NewReader(buf), ld); err != nil {
return pdata.Logs{}, err
}
otlpgrpc.InstrumentationLibraryLogsToScope(ld.ResourceLogs)
return ipdata.LogsFromOtlp(ld), nil
}

Expand All @@ -62,6 +64,7 @@ func (d *jsonUnmarshaler) UnmarshalMetrics(buf []byte) (pdata.Metrics, error) {
if err := d.delegate.Unmarshal(bytes.NewReader(buf), md); err != nil {
return pdata.Metrics{}, err
}
otlpgrpc.InstrumentationLibraryMetricsToScope(md.ResourceMetrics)
return ipdata.MetricsFromOtlp(md), nil
}

Expand All @@ -70,5 +73,6 @@ func (d *jsonUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) {
if err := d.delegate.Unmarshal(bytes.NewReader(buf), td); err != nil {
return pdata.Traces{}, err
}
otlpgrpc.InstrumentationLibrarySpansToScope(td.ResourceSpans)
return ipdata.TracesFromOtlp(td), nil
}
39 changes: 37 additions & 2 deletions model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc"

otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
v1 "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
ipdata "go.opentelemetry.io/collector/model/internal/pdata"
"go.opentelemetry.io/collector/model/pdata"
Expand Down Expand Up @@ -119,7 +120,11 @@ func (lr LogsRequest) MarshalProto() ([]byte, error) {

// UnmarshalProto unmarshalls LogsRequest from proto bytes.
func (lr LogsRequest) UnmarshalProto(data []byte) error {
return lr.orig.Unmarshal(data)
if err := lr.orig.Unmarshal(data); err != nil {
return err
}
InstrumentationLibraryLogsToScope(lr.orig.ResourceLogs)
return nil
}

// MarshalJSON marshals LogsRequest into JSON bytes.
Expand All @@ -133,7 +138,11 @@ func (lr LogsRequest) MarshalJSON() ([]byte, error) {

// UnmarshalJSON unmarshalls LogsRequest from JSON bytes.
func (lr LogsRequest) UnmarshalJSON(data []byte) error {
return jsonUnmarshaler.Unmarshal(bytes.NewReader(data), lr.orig)
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), lr.orig); err != nil {
return err
}
InstrumentationLibraryLogsToScope(lr.orig.ResourceLogs)
return nil
}

func (lr LogsRequest) SetLogs(ld pdata.Logs) {
Expand Down Expand Up @@ -191,3 +200,29 @@ func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlog.Exp
rsp, err := s.srv.Export(ctx, LogsRequest{orig: request})
return rsp.orig, err
}

// InstrumentationLibraryLogsToScope implements the translation of resource logs data
// following the v0.15.0 upgrade:
// receivers SHOULD check if instrumentation_library_logs is set
// and scope_logs is not set then the value in instrumentation_library_logs
// SHOULD be used instead by converting InstrumentationLibraryLogs into ScopeLogs.
// If scope_logs is set then instrumentation_library_logs SHOULD be ignored.
// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/logs/v1/logs.proto#L58
func InstrumentationLibraryLogsToScope(rls []*otlplogs.ResourceLogs) {
for _, rl := range rls {
if len(rl.ScopeLogs) == 0 {
for _, ill := range rl.InstrumentationLibraryLogs {
scopeLogs := otlplogs.ScopeLogs{
Scope: v1.InstrumentationScope{
Name: ill.InstrumentationLibrary.Name,
Version: ill.InstrumentationLibrary.Version,
},
LogRecords: ill.LogRecords,
SchemaUrl: ill.SchemaUrl,
}
rl.ScopeLogs = append(rl.ScopeLogs, &scopeLogs)
}
}
rl.InstrumentationLibraryLogs = nil
}
}
149 changes: 134 additions & 15 deletions model/otlpgrpc/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

v1 "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
"go.opentelemetry.io/collector/model/internal/pdata"
)

Expand All @@ -42,27 +43,88 @@ var _ json.Marshaler = LogsRequest{}

var logsRequestJSON = []byte(`
{
"resourceLogs": [
"resourceLogs": [
{
"resource": {},
"scopeLogs": [
{
"scope": {},
"logRecords": [
"resource": {},
"scopeLogs": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
"scope": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
]
}
]
]
}
]
]
}`)

var logsTransitionData = [][]byte{
[]byte(`
{
"resourceLogs": [
{
"resource": {},
"instrumentationLibraryLogs": [
{
"instrumentationLibrary": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
]
}
]
}`),
[]byte(`
{
"resourceLogs": [
{
"resource": {},
"instrumentationLibraryLogs": [
{
"instrumentationLibrary": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
],
"scopeLogs": [
{
"scope": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
]
}
]
}`),
}

func TestLogsRequestJSON(t *testing.T) {
lr := NewLogsRequest()
assert.NoError(t, lr.UnmarshalJSON(logsRequestJSON))
Expand All @@ -73,6 +135,18 @@ func TestLogsRequestJSON(t *testing.T) {
assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got))
}

func TestLogsRequestJSONTransition(t *testing.T) {
for _, data := range logsTransitionData {
lr := NewLogsRequest()
assert.NoError(t, lr.UnmarshalJSON(data))
assert.Equal(t, "test_log_record", lr.Logs().ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString())

got, err := lr.MarshalJSON()
assert.NoError(t, err)
assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got))
}
}

func TestLogsRequestJSON_Deprecated(t *testing.T) {
lr, err := UnmarshalJSONLogsRequest(logsRequestJSON)
assert.NoError(t, err)
Expand Down Expand Up @@ -116,6 +190,41 @@ func TestLogsGrpc(t *testing.T) {
assert.Equal(t, NewLogsResponse(), resp)
}

func TestLogsGrpcTransition(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
RegisterLogsServer(s, &fakeLogsServer{t: t})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, s.Serve(lis))
}()
t.Cleanup(func() {
s.Stop()
wg.Wait()
})

cc, err := grpc.Dial("bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock())
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, cc.Close())
})

logClient := NewLogsClient(cc)

req := generateLogsRequestWithInstrumentationLibrary()
InstrumentationLibraryLogsToScope(req.orig.ResourceLogs)
resp, err := logClient.Export(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, NewLogsResponse(), resp)
}

func TestLogsGrpcError(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
Expand Down Expand Up @@ -170,3 +279,13 @@ func generateLogsRequest() LogsRequest {
lr.SetLogs(ld)
return lr
}

func generateLogsRequestWithInstrumentationLibrary() LogsRequest {
lr := generateLogsRequest()
lr.orig.ResourceLogs[0].InstrumentationLibraryLogs = []*v1.InstrumentationLibraryLogs{ //nolint:staticcheck // SA1019 ignore this!
{
LogRecords: lr.orig.ResourceLogs[0].ScopeLogs[0].LogRecords,
},
}
return lr
}
33 changes: 32 additions & 1 deletion model/otlpgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/grpc"

otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
v1 "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
ipdata "go.opentelemetry.io/collector/model/internal/pdata"
"go.opentelemetry.io/collector/model/pdata"
Expand Down Expand Up @@ -129,7 +130,11 @@ func (mr MetricsRequest) MarshalJSON() ([]byte, error) {

// UnmarshalJSON unmarshalls MetricsRequest from JSON bytes.
func (mr MetricsRequest) UnmarshalJSON(data []byte) error {
return jsonUnmarshaler.Unmarshal(bytes.NewReader(data), mr.orig)
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), mr.orig); err != nil {
return err
}
InstrumentationLibraryMetricsToScope(mr.orig.ResourceMetrics)
return nil
}

func (mr MetricsRequest) SetMetrics(ld pdata.Metrics) {
Expand Down Expand Up @@ -187,3 +192,29 @@ func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetr
rsp, err := s.srv.Export(ctx, MetricsRequest{orig: request})
return rsp.orig, err
}

// InstrumentationLibraryMetricsToScope implements the translation of resource metrics data
// following the v0.15.0 upgrade:
// receivers SHOULD check if instrumentation_library_metrics is set
// and scope_metrics is not set then the value in instrumentation_library_metrics
// SHOULD be used instead by converting InstrumentationLibraryMetrics into ScopeMetrics.
// If scope_metrics is set then instrumentation_library_metrics SHOULD be ignored.
// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/metrics/v1/metrics.proto#L58
func InstrumentationLibraryMetricsToScope(rms []*otlpmetrics.ResourceMetrics) {
for _, rm := range rms {
if len(rm.ScopeMetrics) == 0 {
for _, ilm := range rm.InstrumentationLibraryMetrics {
scopeMetrics := otlpmetrics.ScopeMetrics{
Scope: v1.InstrumentationScope{
Name: ilm.InstrumentationLibrary.Name,
Version: ilm.InstrumentationLibrary.Version,
},
Metrics: ilm.Metrics,
SchemaUrl: ilm.SchemaUrl,
}
rm.ScopeMetrics = append(rm.ScopeMetrics, &scopeMetrics)
}
}
rm.InstrumentationLibraryMetrics = nil
}
}
Loading

0 comments on commit 831373a

Please sign in to comment.