Skip to content

Commit

Permalink
[receiver/otlp] fix otlp <0.15.0 receiver bug (#5189)
Browse files Browse the repository at this point in the history
* [receiver/otlp] fix otlp <0.15.0 receiver bug

The codepath through the OTLP receiver for gRPC was not translating the `InstrumentationLibrary*` to `Scope*`.

* update changelog

* add unit tests

* cleanup fake server name
  • Loading branch information
Alex Boten authored Apr 12, 2022
1 parent 6ede324 commit 31d39a1
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Setup the correct meter provider if telemetry.useOtelForInternalMetrics featuregate enabled (#5146)
- Fix pdata.Value.asRaw() to correctly return elements of Slice and Map type (#5153)
- Update pdata.Slice.asRaw() to return raw representation of Slice and Map elements (#5157)
- The codepath through the OTLP receiver for gRPC was not translating the InstrumentationLibrary* to Scope* (#5189)

## v0.48.0 Beta

Expand Down
1 change: 1 addition & 0 deletions pdata/plog/plogotlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type rawLogsServer struct {
}

func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlog.ExportLogsServiceRequest) (*otlpcollectorlog.ExportLogsServiceResponse, error) {
otlp.InstrumentationLibraryLogsToScope(request.ResourceLogs)
rsp, err := s.srv.Export(ctx, Request{orig: request})
return rsp.orig, err
}
43 changes: 43 additions & 0 deletions pdata/plog/plogotlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,48 @@ func TestGrpcTransition(t *testing.T) {
assert.Equal(t, NewResponse(), resp)
}

type fakeRawServer struct {
t *testing.T
}

func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) {
assert.Equal(s.t, 1, req.Logs().LogRecordCount())
return NewResponse(), nil
}

func TestGrpcExport(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
RegisterServer(s, &fakeRawServer{t: t})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, s.Serve(lis))
}()
t.Cleanup(func() {
s.Stop()
wg.Wait()
})

cc, err := grpc.Dial("bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock())
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, cc.Close())
})

logClient := NewClient(cc)

resp, err := logClient.Export(context.Background(), generateLogsRequestWithInstrumentationLibrary())
assert.NoError(t, err)
assert.Equal(t, NewResponse(), resp)
}

func TestGrpcError(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
Expand Down Expand Up @@ -278,5 +320,6 @@ func generateLogsRequestWithInstrumentationLibrary() Request {
LogRecords: lr.orig.ResourceLogs[0].ScopeLogs[0].LogRecords,
},
}
lr.orig.ResourceLogs[0].ScopeLogs = []*v1.ScopeLogs{}
return lr
}
1 change: 1 addition & 0 deletions pdata/pmetric/pmetricotlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type rawMetricsServer struct {
}

func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) {
otlp.InstrumentationLibraryMetricsToScope(request.ResourceMetrics)
rsp, err := s.srv.Export(ctx, Request{orig: request})
return rsp.orig, err
}
48 changes: 47 additions & 1 deletion pdata/pmetric/pmetricotlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,48 @@ func TestGrpcTransition(t *testing.T) {
assert.Equal(t, NewResponse(), resp)
}

type fakeRawServer struct {
t *testing.T
}

func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) {
assert.Equal(s.t, 1, req.Metrics().DataPointCount())
return NewResponse(), nil
}

func TestGrpcExport(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
RegisterServer(s, &fakeRawServer{t: t})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, s.Serve(lis))
}()
t.Cleanup(func() {
s.Stop()
wg.Wait()
})

cc, err := grpc.Dial("bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock())
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, cc.Close())
})

metricClient := NewClient(cc)

resp, err := metricClient.Export(context.Background(), generateMetricsRequestWithInstrumentationLibrary())
assert.NoError(t, err)
assert.Equal(t, NewResponse(), resp)
}

func TestGrpcError(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
Expand Down Expand Up @@ -248,7 +290,10 @@ func (f fakeMetricsServer) Export(_ context.Context, request Request) (Response,

func generateMetricsRequest() Request {
md := pmetric.NewMetrics()
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("test_metric")
m := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
m.SetName("test_metric")
m.SetDataType(pmetric.MetricDataTypeGauge)
m.Gauge().DataPoints().AppendEmpty()

mr := NewRequest()
mr.SetMetrics(md)
Expand All @@ -262,5 +307,6 @@ func generateMetricsRequestWithInstrumentationLibrary() Request {
Metrics: mr.orig.ResourceMetrics[0].ScopeMetrics[0].Metrics,
},
}
mr.orig.ResourceMetrics[0].ScopeMetrics = []*v1.ScopeMetrics{}
return mr
}
1 change: 1 addition & 0 deletions pdata/ptrace/ptraceotlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type rawTracesServer struct {
}

func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortrace.ExportTraceServiceRequest) (*otlpcollectortrace.ExportTraceServiceResponse, error) {
otlp.InstrumentationLibrarySpansToScope(request.ResourceSpans)
rsp, err := s.srv.Export(ctx, Request{orig: request})
return rsp.orig, err
}
44 changes: 44 additions & 0 deletions pdata/ptrace/ptraceotlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,49 @@ func TestGrpcTransition(t *testing.T) {
assert.Equal(t, NewResponse(), resp)
}

type fakeRawServer struct {
t *testing.T
}

func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) {
assert.Equal(s.t, 1, req.Traces().SpanCount())
return NewResponse(), nil
}

func TestGrpcExport(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()

RegisterServer(s, &fakeRawServer{t: t})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, s.Serve(lis))
}()
t.Cleanup(func() {
s.Stop()
wg.Wait()
})

cc, err := grpc.Dial("bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock())
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, cc.Close())
})
traceClient := NewClient(cc)

req := generateTracesRequestWithInstrumentationLibrary()
resp, err := traceClient.Export(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, NewResponse(), resp)
}

func TestGrpcError(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
Expand Down Expand Up @@ -278,5 +321,6 @@ func generateTracesRequestWithInstrumentationLibrary() Request {
Spans: tr.orig.ResourceSpans[0].ScopeSpans[0].Spans,
},
}
tr.orig.ResourceSpans[0].ScopeSpans = []*v1.ScopeSpans{}
return tr
}

0 comments on commit 31d39a1

Please sign in to comment.