diff --git a/CHANGELOG.md b/CHANGELOG.md index 6719d57fe88..32c7be9bfe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - Setup the correct meter provider if telemetry.useOtelForInternalMetrics featuregate enabled (#5146) - Fix pdata.Value.asRaw() to correctly return elements of Slice and Map type (#5153) - Update pdata.Slice.asRaw() to return raw representation of Slice and Map elements (#5157) +- The codepath through the OTLP receiver for gRPC was not translating the InstrumentationLibrary* to Scope* (#5189) ## v0.48.0 Beta diff --git a/pdata/plog/plogotlp/logs.go b/pdata/plog/plogotlp/logs.go index 5e380cc4973..abb40af72f8 100644 --- a/pdata/plog/plogotlp/logs.go +++ b/pdata/plog/plogotlp/logs.go @@ -159,6 +159,7 @@ type rawLogsServer struct { } func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlog.ExportLogsServiceRequest) (*otlpcollectorlog.ExportLogsServiceResponse, error) { + otlp.InstrumentationLibraryLogsToScope(request.ResourceLogs) rsp, err := s.srv.Export(ctx, Request{orig: request}) return rsp.orig, err } diff --git a/pdata/plog/plogotlp/logs_test.go b/pdata/plog/plogotlp/logs_test.go index 9e29309f5f0..37b5c0065c7 100644 --- a/pdata/plog/plogotlp/logs_test.go +++ b/pdata/plog/plogotlp/logs_test.go @@ -216,6 +216,48 @@ func TestGrpcTransition(t *testing.T) { assert.Equal(t, NewResponse(), resp) } +type fakeRawServer struct { + t *testing.T +} + +func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) { + assert.Equal(s.t, 1, req.Logs().LogRecordCount()) + return NewResponse(), nil +} + +func TestGrpcExport(t *testing.T) { + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + RegisterServer(s, &fakeRawServer{t: t}) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, s.Serve(lis)) + }() + t.Cleanup(func() { + s.Stop() + wg.Wait() + }) + + cc, err := grpc.Dial("bufnet", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock()) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, cc.Close()) + }) + + logClient := NewClient(cc) + + resp, err := logClient.Export(context.Background(), generateLogsRequestWithInstrumentationLibrary()) + assert.NoError(t, err) + assert.Equal(t, NewResponse(), resp) +} + func TestGrpcError(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -278,5 +320,6 @@ func generateLogsRequestWithInstrumentationLibrary() Request { LogRecords: lr.orig.ResourceLogs[0].ScopeLogs[0].LogRecords, }, } + lr.orig.ResourceLogs[0].ScopeLogs = []*v1.ScopeLogs{} return lr } diff --git a/pdata/pmetric/pmetricotlp/metrics.go b/pdata/pmetric/pmetricotlp/metrics.go index 5d9b3021dbd..c0a758d1986 100644 --- a/pdata/pmetric/pmetricotlp/metrics.go +++ b/pdata/pmetric/pmetricotlp/metrics.go @@ -155,6 +155,7 @@ type rawMetricsServer struct { } func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) { + otlp.InstrumentationLibraryMetricsToScope(request.ResourceMetrics) rsp, err := s.srv.Export(ctx, Request{orig: request}) return rsp.orig, err } diff --git a/pdata/pmetric/pmetricotlp/metrics_test.go b/pdata/pmetric/pmetricotlp/metrics_test.go index 976efca357a..fb7c9ba77ba 100644 --- a/pdata/pmetric/pmetricotlp/metrics_test.go +++ b/pdata/pmetric/pmetricotlp/metrics_test.go @@ -200,6 +200,48 @@ func TestGrpcTransition(t *testing.T) { assert.Equal(t, NewResponse(), resp) } +type fakeRawServer struct { + t *testing.T +} + +func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) { + assert.Equal(s.t, 1, req.Metrics().DataPointCount()) + return NewResponse(), nil +} + +func TestGrpcExport(t *testing.T) { + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + RegisterServer(s, &fakeRawServer{t: t}) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, s.Serve(lis)) + }() + t.Cleanup(func() { + s.Stop() + wg.Wait() + }) + + cc, err := grpc.Dial("bufnet", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock()) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, cc.Close()) + }) + + metricClient := NewClient(cc) + + resp, err := metricClient.Export(context.Background(), generateMetricsRequestWithInstrumentationLibrary()) + assert.NoError(t, err) + assert.Equal(t, NewResponse(), resp) +} + func TestGrpcError(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -248,7 +290,10 @@ func (f fakeMetricsServer) Export(_ context.Context, request Request) (Response, func generateMetricsRequest() Request { md := pmetric.NewMetrics() - md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("test_metric") + m := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + m.SetName("test_metric") + m.SetDataType(pmetric.MetricDataTypeGauge) + m.Gauge().DataPoints().AppendEmpty() mr := NewRequest() mr.SetMetrics(md) @@ -262,5 +307,6 @@ func generateMetricsRequestWithInstrumentationLibrary() Request { Metrics: mr.orig.ResourceMetrics[0].ScopeMetrics[0].Metrics, }, } + mr.orig.ResourceMetrics[0].ScopeMetrics = []*v1.ScopeMetrics{} return mr } diff --git a/pdata/ptrace/ptraceotlp/traces.go b/pdata/ptrace/ptraceotlp/traces.go index 95f87ba74f3..903c3a7ec84 100644 --- a/pdata/ptrace/ptraceotlp/traces.go +++ b/pdata/ptrace/ptraceotlp/traces.go @@ -160,6 +160,7 @@ type rawTracesServer struct { } func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortrace.ExportTraceServiceRequest) (*otlpcollectortrace.ExportTraceServiceResponse, error) { + otlp.InstrumentationLibrarySpansToScope(request.ResourceSpans) rsp, err := s.srv.Export(ctx, Request{orig: request}) return rsp.orig, err } diff --git a/pdata/ptrace/ptraceotlp/traces_test.go b/pdata/ptrace/ptraceotlp/traces_test.go index 8af98b3ae27..235507b8f2a 100644 --- a/pdata/ptrace/ptraceotlp/traces_test.go +++ b/pdata/ptrace/ptraceotlp/traces_test.go @@ -216,6 +216,49 @@ func TestGrpcTransition(t *testing.T) { assert.Equal(t, NewResponse(), resp) } +type fakeRawServer struct { + t *testing.T +} + +func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) { + assert.Equal(s.t, 1, req.Traces().SpanCount()) + return NewResponse(), nil +} + +func TestGrpcExport(t *testing.T) { + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + + RegisterServer(s, &fakeRawServer{t: t}) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, s.Serve(lis)) + }() + t.Cleanup(func() { + s.Stop() + wg.Wait() + }) + + cc, err := grpc.Dial("bufnet", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock()) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, cc.Close()) + }) + traceClient := NewClient(cc) + + req := generateTracesRequestWithInstrumentationLibrary() + resp, err := traceClient.Export(context.Background(), req) + assert.NoError(t, err) + assert.Equal(t, NewResponse(), resp) +} + func TestGrpcError(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -278,5 +321,6 @@ func generateTracesRequestWithInstrumentationLibrary() Request { Spans: tr.orig.ResourceSpans[0].ScopeSpans[0].Spans, }, } + tr.orig.ResourceSpans[0].ScopeSpans = []*v1.ScopeSpans{} return tr }