Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix otlpreceiver transport metrics attribute (#6784)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
bogdandrutu authored Dec 13, 2022
1 parent 4b82862 commit def5561
Showing 13 changed files with 262 additions and 183 deletions.
11 changes: 11 additions & 0 deletions .chloggen/fixotlptransport.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlpreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix otlpreceiver transport metrics attribute

# One or more tracking issues or pull requests related to the change
issues: [6695]
12 changes: 6 additions & 6 deletions connector/forwardconnector/forward.go
Original file line number Diff line number Diff line change
@@ -59,8 +59,8 @@ func (f *forwardFactory) createTracesToTraces(
cfg component.Config,
nextConsumer consumer.Traces,
) (connector.Traces, error) {
comp := f.GetOrAdd(cfg, func() component.Component {
return &forward{}
comp, _ := f.GetOrAdd(cfg, func() (component.Component, error) {
return &forward{}, nil
})

conn := comp.Unwrap().(*forward)
@@ -75,8 +75,8 @@ func (f *forwardFactory) createMetricsToMetrics(
cfg component.Config,
nextConsumer consumer.Metrics,
) (connector.Metrics, error) {
comp := f.GetOrAdd(cfg, func() component.Component {
return &forward{}
comp, _ := f.GetOrAdd(cfg, func() (component.Component, error) {
return &forward{}, nil
})

conn := comp.Unwrap().(*forward)
@@ -91,8 +91,8 @@ func (f *forwardFactory) createLogsToLogs(
cfg component.Config,
nextConsumer consumer.Logs,
) (connector.Logs, error) {
comp := f.GetOrAdd(cfg, func() component.Component {
return &forward{}
comp, _ := f.GetOrAdd(cfg, func() (component.Component, error) {
return &forward{}, nil
})

conn := comp.Unwrap().(*forward)
12 changes: 8 additions & 4 deletions internal/sharedcomponent/sharedcomponent.go
Original file line number Diff line number Diff line change
@@ -38,18 +38,22 @@ func NewSharedComponents() *SharedComponents {

// GetOrAdd returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (scs *SharedComponents) GetOrAdd(key interface{}, create func() component.Component) *SharedComponent {
func (scs *SharedComponents) GetOrAdd(key interface{}, create func() (component.Component, error)) (*SharedComponent, error) {
if c, ok := scs.comps[key]; ok {
return c
return c, nil
}
comp, err := create()
if err != nil {
return nil, err
}
newComp := &SharedComponent{
Component: create(),
Component: comp,
removeFunc: func() {
delete(scs.comps, key)
},
}
scs.comps[key] = newComp
return newComp
return newComp, nil
}

// SharedComponent ensures that the wrapped component is started and stopped only once.
29 changes: 22 additions & 7 deletions internal/sharedcomponent/sharedcomponent_test.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
@@ -37,20 +38,33 @@ func TestNewSharedComponents(t *testing.T) {
assert.Len(t, comps.comps, 0)
}

func TestSharedComponents_GetOrAdd(t *testing.T) {
func TestNewSharedComponentsCreateError(t *testing.T) {
comps := NewSharedComponents()
assert.Len(t, comps.comps, 0)
myErr := errors.New("my error")
_, err := comps.GetOrAdd(id, func() (component.Component, error) { return nil, myErr })
assert.ErrorIs(t, err, myErr)
assert.Len(t, comps.comps, 0)
}

func TestSharedComponentsGetOrAdd(t *testing.T) {
nop := &baseComponent{}
createNop := func() component.Component { return nop }

comps := NewSharedComponents()
got := comps.GetOrAdd(id, createNop)
got, err := comps.GetOrAdd(id, func() (component.Component, error) { return nop, nil })
require.NoError(t, err)
assert.Len(t, comps.comps, 1)
assert.Same(t, nop, got.Unwrap())
assert.Same(t, got, comps.GetOrAdd(id, createNop))
gotSecond, err := comps.GetOrAdd(id, func() (component.Component, error) { panic("should not be called") })
require.NoError(t, err)
assert.Same(t, got, gotSecond)

// Shutdown nop will remove
assert.NoError(t, got.Shutdown(context.Background()))
assert.Len(t, comps.comps, 0)
assert.NotSame(t, got, comps.GetOrAdd(id, createNop))
gotThird, err := comps.GetOrAdd(id, func() (component.Component, error) { return nop, nil })
require.NoError(t, err)
assert.NotSame(t, got, gotThird)
}

func TestSharedComponent(t *testing.T) {
@@ -66,10 +80,11 @@ func TestSharedComponent(t *testing.T) {
calledStop++
return wantErr
}}
createComp := func() component.Component { return comp }

comps := NewSharedComponents()
got := comps.GetOrAdd(id, createComp)
got, err := comps.GetOrAdd(id, func() (component.Component, error) { return comp, nil })
require.NoError(t, err)

assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// Second time is not called anymore.
21 changes: 15 additions & 6 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -69,11 +69,14 @@ func createTraces(
cfg component.Config,
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
r, err := receivers.GetOrAdd(cfg, func() (component.Component, error) {
return newOtlpReceiver(cfg.(*Config), set)
})
if err != nil {
return nil, err
}

if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(nextConsumer); err != nil {
if err = r.Unwrap().(*otlpReceiver).registerTraceConsumer(nextConsumer); err != nil {
return nil, err
}
return r, nil
@@ -86,11 +89,14 @@ func createMetrics(
cfg component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
r, err := receivers.GetOrAdd(cfg, func() (component.Component, error) {
return newOtlpReceiver(cfg.(*Config), set)
})
if err != nil {
return nil, err
}

if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(consumer); err != nil {
if err = r.Unwrap().(*otlpReceiver).registerMetricsConsumer(consumer); err != nil {
return nil, err
}
return r, nil
@@ -103,11 +109,14 @@ func createLog(
cfg component.Config,
consumer consumer.Logs,
) (receiver.Logs, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
r, err := receivers.GetOrAdd(cfg, func() (component.Component, error) {
return newOtlpReceiver(cfg.(*Config), set)
})
if err != nil {
return nil, err
}

if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(consumer); err != nil {
if err = r.Unwrap().(*otlpReceiver).registerLogsConsumer(consumer); err != nil {
return nil, err
}
return r, nil
15 changes: 2 additions & 13 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
@@ -20,12 +20,10 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/receiver"
)

const (
dataFormatProtobuf = "protobuf"
receiverTransport = "grpc"
)

// Receiver is the type used to handle spans from OpenTelemetry exporters.
@@ -35,20 +33,11 @@ type Receiver struct {
}

// New creates a new Receiver reference.
func New(nextConsumer consumer.Logs, set receiver.CreateSettings) (*Receiver, error) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: receiverTransport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}

func New(nextConsumer consumer.Logs, obsrecv *obsreport.Receiver) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsrecv: obsrecv,
}, nil
}
}

// Export implements the service Export logs func.
8 changes: 7 additions & 1 deletion receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/receiver/receivertest"
)
@@ -88,8 +89,13 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr {

set := receivertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("otlp", "log")
r, err := New(lc, set)
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: "grpc",
ReceiverCreateSettings: set,
})
require.NoError(t, err)
r := New(lc, obsrecv)
// Now run it as a gRPC server
srv := grpc.NewServer()
plogotlp.RegisterGRPCServer(srv, r)
14 changes: 2 additions & 12 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/receiver"
)

const (
@@ -35,20 +34,11 @@ type Receiver struct {
}

// New creates a new Receiver reference.
func New(nextConsumer consumer.Metrics, set receiver.CreateSettings) (*Receiver, error) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: receiverTransport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}

func New(nextConsumer consumer.Metrics, obsrecv *obsreport.Receiver) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsrecv: obsrecv,
}, nil
}
}

// Export implements the service Export metrics func.
8 changes: 7 additions & 1 deletion receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/receiver/receivertest"
)
@@ -89,8 +90,13 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr {

set := receivertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("otlp", "metrics")
r, err := New(mc, set)
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: "grpc",
ReceiverCreateSettings: set,
})
require.NoError(t, err)
r := New(mc, obsrecv)
// Now run it as a gRPC server
srv := grpc.NewServer()
pmetricotlp.RegisterGRPCServer(srv, r)
14 changes: 2 additions & 12 deletions receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver"
)

const (
@@ -35,20 +34,11 @@ type Receiver struct {
}

// New creates a new Receiver reference.
func New(nextConsumer consumer.Traces, set receiver.CreateSettings) (*Receiver, error) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: receiverTransport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}

func New(nextConsumer consumer.Traces, obsrecv *obsreport.Receiver) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsrecv: obsrecv,
}, nil
}
}

// Export implements the service Export traces func.
8 changes: 7 additions & 1 deletion receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver/receivertest"
)
@@ -86,8 +87,13 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr {

set := receivertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("otlp", "trace")
r, err := New(tc, set)
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: "grpc",
ReceiverCreateSettings: set,
})
require.NoError(t, err)
r := New(tc, obsrecv)
// Now run it as a gRPC server
srv := grpc.NewServer()
ptraceotlp.RegisterGRPCServer(srv, r)
75 changes: 43 additions & 32 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
@@ -45,27 +46,48 @@ type otlpReceiver struct {
httpMux *http.ServeMux
serverHTTP *http.Server

traceReceiver *trace.Receiver
tracesReceiver *trace.Receiver
metricsReceiver *metrics.Receiver
logReceiver *logs.Receiver
logsReceiver *logs.Receiver
shutdownWG sync.WaitGroup

obsrepGRPC *obsreport.Receiver
obsrepHTTP *obsreport.Receiver

settings receiver.CreateSettings
}

// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func newOtlpReceiver(cfg *Config, settings receiver.CreateSettings) *otlpReceiver {
func newOtlpReceiver(cfg *Config, set receiver.CreateSettings) (*otlpReceiver, error) {
r := &otlpReceiver{
cfg: cfg,
settings: settings,
settings: set,
}
if cfg.HTTP != nil {
r.httpMux = http.NewServeMux()
}

return r
var err error
r.obsrepGRPC, err = obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: "grpc",
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
r.obsrepHTTP, err = obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: "http",
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}

return r, nil
}

func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error {
@@ -112,16 +134,16 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
return err
}

if r.traceReceiver != nil {
ptraceotlp.RegisterGRPCServer(r.serverGRPC, r.traceReceiver)
if r.tracesReceiver != nil {
ptraceotlp.RegisterGRPCServer(r.serverGRPC, r.tracesReceiver)
}

if r.metricsReceiver != nil {
pmetricotlp.RegisterGRPCServer(r.serverGRPC, r.metricsReceiver)
}

if r.logReceiver != nil {
plogotlp.RegisterGRPCServer(r.serverGRPC, r.logReceiver)
if r.logsReceiver != nil {
plogotlp.RegisterGRPCServer(r.serverGRPC, r.logsReceiver)
}

err = r.startGRPCServer(r.cfg.GRPC, host)
@@ -175,11 +197,8 @@ func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error {
if tc == nil {
return component.ErrNilNextConsumer
}
var err error
r.traceReceiver, err = trace.New(tc, r.settings)
if err != nil {
return err
}
r.tracesReceiver = trace.New(tc, r.obsrepGRPC)
httpTracesReceiver := trace.New(tc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/traces", func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
@@ -188,9 +207,9 @@ func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error {
}
switch req.Header.Get("Content-Type") {
case pbContentType:
handleTraces(resp, req, r.traceReceiver, pbEncoder)
handleTraces(resp, req, httpTracesReceiver, pbEncoder)
case jsonContentType:
handleTraces(resp, req, r.traceReceiver, jsEncoder)
handleTraces(resp, req, httpTracesReceiver, jsEncoder)
default:
handleUnmatchedContentType(resp)
}
@@ -203,12 +222,8 @@ func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
if mc == nil {
return component.ErrNilNextConsumer
}
var err error
r.metricsReceiver, err = metrics.New(mc, r.settings)
if err != nil {
return err
}

r.metricsReceiver = metrics.New(mc, r.obsrepGRPC)
httpMetricsReceiver := metrics.New(mc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/metrics", func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
@@ -217,9 +232,9 @@ func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
}
switch req.Header.Get("Content-Type") {
case pbContentType:
handleMetrics(resp, req, r.metricsReceiver, pbEncoder)
handleMetrics(resp, req, httpMetricsReceiver, pbEncoder)
case jsonContentType:
handleMetrics(resp, req, r.metricsReceiver, jsEncoder)
handleMetrics(resp, req, httpMetricsReceiver, jsEncoder)
default:
handleUnmatchedContentType(resp)
}
@@ -232,12 +247,8 @@ func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error {
if lc == nil {
return component.ErrNilNextConsumer
}
var err error
r.logReceiver, err = logs.New(lc, r.settings)
if err != nil {
return err
}

r.logsReceiver = logs.New(lc, r.obsrepGRPC)
httpLogsReceiver := logs.New(lc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/logs", func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
@@ -246,9 +257,9 @@ func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error {
}
switch req.Header.Get("Content-Type") {
case pbContentType:
handleLogs(resp, req, r.logReceiver, pbEncoder)
handleLogs(resp, req, httpLogsReceiver, pbEncoder)
case jsonContentType:
handleLogs(resp, req, r.logReceiver, jsEncoder)
handleLogs(resp, req, httpLogsReceiver, jsEncoder)
default:
handleUnmatchedContentType(resp)
}
218 changes: 130 additions & 88 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
@@ -347,7 +347,7 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, enco
buf = bytes.NewBuffer(traceJSON)
}
sink.SetConsumeError(expectedErr)
req, err := http.NewRequest("POST", url, buf)
req, err := http.NewRequest(http.MethodPost, url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", encoding)
@@ -357,13 +357,8 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, enco
require.NoError(t, err, "Error posting trace to http server: %v", err)

respBytes, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading response from trace http server, %v", err)
}
err = resp.Body.Close()
if err != nil {
t.Errorf("Error closing response body, %v", err)
}
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

allTraces := sink.AllTraces()
if expectedErr == nil {
@@ -428,9 +423,7 @@ func TestProtoHttp(t *testing.T) {
td := testdata.GenerateTraces(1)
marshaler := &ptrace.ProtoMarshaler{}
traceBytes, err := marshaler.MarshalTraces(td)
if err != nil {
t.Errorf("Error marshaling protobuf: %v", err)
}
require.NoError(t, err)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
@@ -456,7 +449,7 @@ func createHTTPProtobufRequest(
default:
buf = bytes.NewBuffer(traceBytes)
}
req, err := http.NewRequest("POST", url, buf)
req, err := http.NewRequest(http.MethodPost, url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", encoding)
@@ -564,7 +557,7 @@ func TestOTLPReceiverInvalidContentEncoding(t *testing.T) {
body, err := test.reqBodyFunc()
require.NoError(t, err, "Error creating request body: %v", err)

req, err := http.NewRequest("POST", url, body)
req, err := http.NewRequest(http.MethodPost, url, body)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", test.content)
req.Header.Set("Content-Encoding", test.encoding)
@@ -594,7 +587,7 @@ func TestGRPCNewPortAlreadyUsed(t *testing.T) {
assert.NoError(t, ln.Close())
})

r := newGRPCReceiver(t, otlpReceiverName, addr, consumertest.NewNop(), consumertest.NewNop())
r := newGRPCReceiver(t, addr, consumertest.NewNop(), consumertest.NewNop())
require.NotNil(t, r)

require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
@@ -614,98 +607,149 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) {
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

// TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
// TestOTLPReceiverGRPCTracesIngestTest checks that the gRPC trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
// next trace consumer, checks if data was passed down the pipeline and if
// proper metrics were recorded. It also uses all endpoints supported by the
// trace receiver.
func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
type ingestionStateTest struct {
okToIngest bool
expectedCode codes.Code
}
tests := []struct {
name string
expectedReceivedBatches int
expectedIngestionBlockedRPCs int
ingestionStates []ingestionStateTest
}{

expectedReceivedBatches := 2
expectedIngestionBlockedRPCs := 1
ingestionStates := []ingestionStateTest{
{
name: "IngestTest",
expectedReceivedBatches: 2,
expectedIngestionBlockedRPCs: 1,
ingestionStates: []ingestionStateTest{
{
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.Unknown,
},
{
okToIngest: true,
expectedCode: codes.OK,
},
},
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.Unknown,
},
{
okToIngest: true,
expectedCode: codes.OK,
},
}

addr := testutil.GetAvailableLocalAddress(t)
req := testdata.GenerateTraces(1)
td := testdata.GenerateTraces(1)

exporters := []struct {
receiverTag string
exportFn func(
cc *grpc.ClientConn,
td ptrace.Traces) error
}{
tt, err := obsreporttest.SetupTelemetry(otlpReceiverID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}

ocr := newGRPCReceiver(t, addr, sink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })

cc, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
require.NoError(t, err)
defer func() {
assert.NoError(t, cc.Close())
}()

for _, ingestionState := range ingestionStates {
if ingestionState.okToIngest {
sink.SetConsumeError(nil)
} else {
sink.SetConsumeError(errors.New("consumer error"))
}

_, err = ptraceotlp.NewGRPCClient(cc).Export(context.Background(), ptraceotlp.NewExportRequestFromTraces(td))
errStatus, ok := status.FromError(err)
require.True(t, ok)
assert.Equal(t, ingestionState.expectedCode, errStatus.Code())
}

require.Equal(t, expectedReceivedBatches, len(sink.AllTraces()))

require.NoError(t, tt.CheckReceiverTraces("grpc", int64(expectedReceivedBatches), int64(expectedIngestionBlockedRPCs)))
}

// TestOTLPReceiverHTTPTracesNextConsumerResponse checks that the HTTP trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
// next trace consumer, checks if data was passed down the pipeline and if
// proper metrics were recorded. It also uses all endpoints supported by the
// trace receiver.
func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
type ingestionStateTest struct {
okToIngest bool
expectedCode codes.Code
}

expectedReceivedBatches := 2
expectedIngestionBlockedRPCs := 1
ingestionStates := []ingestionStateTest{
{
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.Unknown,
},
{
receiverTag: "trace",
exportFn: exportTraces,
okToIngest: true,
expectedCode: codes.OK,
},
}
for _, exporter := range exporters {
for _, test := range tests {
t.Run(test.name+"/"+exporter.receiverTag, func(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(component.NewIDWithName(typeStr, exporter.receiverTag))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}

ocr := newGRPCReceiver(t, exporter.receiverTag, addr, sink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })

cc, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
require.NoError(t, err)
defer func() {
assert.NoError(t, cc.Close())
}()

for _, ingestionState := range test.ingestionStates {
if ingestionState.okToIngest {
sink.SetConsumeError(nil)
} else {
sink.SetConsumeError(fmt.Errorf("%q: consumer error", test.name))
}

err = exporter.exportFn(cc, req)
addr := testutil.GetAvailableLocalAddress(t)
td := testdata.GenerateTraces(1)

status, ok := status.FromError(err)
require.True(t, ok)
assert.Equal(t, ingestionState.expectedCode, status.Code())
}
tt, err := obsreporttest.SetupTelemetry(otlpReceiverID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

require.Equal(t, test.expectedReceivedBatches, len(sink.AllTraces()))
sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}

require.NoError(t, tt.CheckReceiverTraces("grpc", int64(test.expectedReceivedBatches), int64(test.expectedIngestionBlockedRPCs)))
})
ocr := newHTTPReceiver(t, addr, sink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })

for _, ingestionState := range ingestionStates {
if ingestionState.okToIngest {
sink.SetConsumeError(nil)
} else {
sink.SetConsumeError(errors.New("consumer error"))
}

pbMarshaler := ptrace.ProtoMarshaler{}
pbBytes, err := pbMarshaler.MarshalTraces(td)
require.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, "http://"+addr+"/v1/traces", bytes.NewReader(pbBytes))
require.NoError(t, err)
req.Header.Set("Content-Type", pbContentType)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

if ingestionState.expectedCode == codes.OK {
require.Equal(t, 200, resp.StatusCode)
tr := ptraceotlp.NewExportResponse()
assert.NoError(t, tr.UnmarshalProto(respBytes))
} else {
errStatus := &spb.Status{}
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.Equal(t, ingestionState.expectedCode, codes.Code(errStatus.Code))
}
}

require.Equal(t, expectedReceivedBatches, len(sink.AllTraces()))

require.NoError(t, tt.CheckReceiverTraces("http", int64(expectedReceivedBatches), int64(expectedIngestionBlockedRPCs)))
}

func TestGRPCInvalidTLSCredentials(t *testing.T) {
@@ -825,7 +869,7 @@ func testHTTPMaxRequestBodySizeJSON(t *testing.T, payload []byte, size int, expe
assert.NotNil(t, r)
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))

req, err := http.NewRequest("POST", url, bytes.NewReader(payload))
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(payload))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
@@ -847,12 +891,12 @@ func TestHTTPMaxRequestBodySize_TooLarge(t *testing.T) {
testHTTPMaxRequestBodySizeJSON(t, traceJSON, len(traceJSON)-1, 400)
}

func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.Traces, mc consumer.Metrics) component.Component {
func newGRPCReceiver(t *testing.T, endpoint string, tc consumer.Traces, mc consumer.Metrics) component.Component {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.GRPC.NetAddr.Endpoint = endpoint
cfg.HTTP = nil
return newReceiver(t, factory, cfg, component.NewIDWithName(typeStr, name), tc, mc)
return newReceiver(t, factory, cfg, otlpReceiverID, tc, mc)
}

func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.Traces, mc consumer.Metrics) component.Component {
@@ -933,9 +977,7 @@ func TestShutdown(t *testing.T) {
// Send request via OTLP/HTTP.
marshaler := &ptrace.ProtoMarshaler{}
traceBytes, err2 := marshaler.MarshalTraces(td)
if err2 != nil {
t.Errorf("Error marshaling protobuf: %v", err2)
}
require.NoError(t, err2)
url := fmt.Sprintf("http://%s/v1/traces", endpointHTTP)
req := createHTTPProtobufRequest(t, url, "", traceBytes)
client := &http.Client{}

0 comments on commit def5561

Please sign in to comment.