diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 77cf6ff35c6..ee2fee33cc8 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -145,7 +145,14 @@ func (c *Collector) Start(options *CollectorOptions) error { } c.zkServer = zkServer - otlpReceiver, err := handler.StartOtelReceiver(c.logger, c.spanProcessor) + otlpReceiver, err := handler.StartOtelReceiver( + handler.OtelReceiverOptions{ + GRPCHostPort: options.OTLP.GRPCHostPort, + HTTPHostPort: options.OTLP.HTTPHostPort, + }, + c.logger, + c.spanProcessor, + ) if err != nil { return err } diff --git a/cmd/collector/app/flags.go b/cmd/collector/app/flags.go index 390247013b7..2a9bf85392b 100644 --- a/cmd/collector/app/flags.go +++ b/cmd/collector/app/flags.go @@ -84,6 +84,11 @@ type CollectorOptions struct { // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. MaxConnectionAgeGrace time.Duration } + // OTLP section defines options for servers accepting OpenTelemetry OTLP format + OTLP struct { + GRPCHostPort string + HTTPHostPort string + } // Zipkin section defines options for Zipkin HTTP server Zipkin struct { // HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index e565178baaf..a98c5ce29d2 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -23,40 +23,62 @@ import ( "google.golang.org/grpc/status" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) // GRPCHandler implements gRPC CollectorService. type GRPCHandler struct { - logger *zap.Logger - spanProcessor processor.SpanProcessor + logger *zap.Logger + // spanProcessor processor.SpanProcessor + batchConsumer batchConsumer } // NewGRPCHandler registers routes for this handler on the given router. func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler { return &GRPCHandler{ - logger: logger, - spanProcessor: spanProcessor, + logger: logger, + // spanProcessor: spanProcessor, + batchConsumer: batchConsumer{ + logger: logger, + spanProcessor: spanProcessor, + spanOptions: processor.SpansOptions{ + InboundTransport: processor.GRPCTransport, + SpanFormat: processor.ProtoSpanFormat, + }, + }, } } // PostSpans implements gRPC CollectorService. func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) { - for _, span := range r.GetBatch().Spans { + batch := &r.Batch + err := g.batchConsumer.consume(batch) + return &api_v2.PostSpansResponse{}, err +} + +type batchConsumer struct { + logger *zap.Logger + spanProcessor processor.SpanProcessor + spanOptions processor.SpansOptions +} + +func (c *batchConsumer) consume(batch *model.Batch) error { + for _, span := range batch.Spans { if span.GetProcess() == nil { - span.Process = r.Batch.Process + span.Process = batch.Process } } - _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{ + _, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{ InboundTransport: processor.GRPCTransport, SpanFormat: processor.ProtoSpanFormat, }) if err != nil { if err == processor.ErrBusy { - return nil, status.Errorf(codes.ResourceExhausted, err.Error()) + return status.Errorf(codes.ResourceExhausted, err.Error()) } - g.logger.Error("cannot process spans", zap.Error(err)) - return nil, err + c.logger.Error("cannot process spans", zap.Error(err)) + return err } - return &api_v2.PostSpansResponse{}, nil + return nil } diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 3b109af70f9..a38b7bc41bf 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -29,6 +29,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -123,32 +124,55 @@ func TestGRPCCompressionEnabled(t *testing.T) { defer conn.Close() // Do not use string constant imported from grpc, since we are actually testing that package is imported by the handler. - _, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{}, - grpc.UseCompressor("gzip")) + _, err := client.PostSpans( + context.Background(), + &api_v2.PostSpansRequest{}, + grpc.UseCompressor("gzip"), + ) require.NoError(t, err) } func TestPostSpansWithError(t *testing.T) { - expectedError := errors.New("test-error") - processor := &mockSpanProcessor{expectedError: expectedError} - server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor) - api_v2.RegisterCollectorServiceServer(s, handler) - }) - defer server.Stop() - client, conn := newClient(t, addr) - defer conn.Close() - r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{ - Batch: model.Batch{ - Spans: []*model.Span{ - { - OperationName: "fake-operation", - }, - }, + testCases := []struct { + processorError error + expectedError string + expectedLog string + }{ + { + processorError: errors.New("test-error"), + expectedError: "test-error", + expectedLog: "test-error", }, - }) - require.Error(t, err) - require.Nil(t, r) - require.Contains(t, err.Error(), expectedError.Error()) - require.Len(t, processor.getSpans(), 1) + { + processorError: processor.ErrBusy, + expectedError: "server busy", + }, + } + for _, test := range testCases { + t.Run(test.expectedError, func(t *testing.T) { + processor := &mockSpanProcessor{expectedError: test.processorError} + logger, logBuf := testutils.NewLogger() + server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { + handler := NewGRPCHandler(logger, processor) + api_v2.RegisterCollectorServiceServer(s, handler) + }) + defer server.Stop() + client, conn := newClient(t, addr) + defer conn.Close() + r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{ + Batch: model.Batch{ + Spans: []*model.Span{ + { + OperationName: "fake-operation", + }, + }, + }, + }) + require.Error(t, err) + require.Nil(t, r) + assert.Contains(t, err.Error(), test.expectedError) + assert.Contains(t, logBuf.String(), test.expectedLog) + assert.Len(t, processor.getSpans(), 1) + }) + } } diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index 5499904eeed..3e156b94b33 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -29,66 +29,86 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) -// A delegation function to assist in tests, because ProtoFromTraces never returns errors despite its API. -var protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error) = otlp2jaeger.ProtoFromTraces +// // A delegation function to assist in tests, because ProtoFromTraces never returns errors despite its API. +// var protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error) = otlp2jaeger.ProtoFromTraces var _ component.Host = (*otelHost)(nil) // API check +// OtelReceiverOptions allows configuration of the receiver. type OtelReceiverOptions struct { - GRPCAddress string - HTTPAddress string + GRPCHostPort string + HTTPHostPort string } // StartOtelReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports. -func StartOtelReceiver(logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) { +func StartOtelReceiver(options OtelReceiverOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) { otlpFactory := otlpreceiver.NewFactory() - otlpReceiverConfig := otlpFactory.CreateDefaultConfig() + otlpReceiverConfig := otlpFactory.CreateDefaultConfig().(*otlpreceiver.Config) + if options.GRPCHostPort != "" { + otlpReceiverConfig.GRPC.NetAddr.Endpoint = options.GRPCHostPort + } + if options.HTTPHostPort != "" { + otlpReceiverConfig.HTTP.Endpoint = options.HTTPHostPort + } otlpReceiverSettings := component.ReceiverCreateSettings{ TelemetrySettings: component.TelemetrySettings{ Logger: logger, TracerProvider: otel.GetTracerProvider(), // TODO we may always want no-op here, not the global default }, } - // TODO re-implement the logic of NewGRPCHandler, it's fairly trivial - jaegerBatchHandler := NewGRPCHandler(logger, spanProcessor) - nextConsumer, err := consumer.NewTraces(consumer.ConsumeTracesFunc(func(ctx context.Context, ld ptrace.Traces) error { - batches, err := protoFromTraces(ld) - if err != nil { - return err - } - for _, batch := range batches { - // TODO generate metrics - _, err := jaegerBatchHandler.PostSpans(ctx, &api_v2.PostSpansRequest{ - Batch: *batch, - }) - if err != nil { - return err - } - } - return nil - })) - if err != nil { - return nil, fmt.Errorf("could not create the OTLP consumer: %w", err) - } - otlpReceiver, err := otlpFactory.CreateTracesReceiver( + + otlpConsumer := newConsumerDelegate(logger, spanProcessor) + // the following two constructors never return errors given non-nil arguments, so we ignore errors + nextConsumer, _ := consumer.NewTraces(consumer.ConsumeTracesFunc(otlpConsumer.consume)) + otlpReceiver, _ := otlpFactory.CreateTracesReceiver( context.Background(), otlpReceiverSettings, otlpReceiverConfig, nextConsumer, ) - if err != nil { - return nil, fmt.Errorf("could not create the OTLP receiver: %w", err) - } - err = otlpReceiver.Start(context.Background(), &otelHost{logger: logger}) - if err != nil { + if err := otlpReceiver.Start(context.Background(), &otelHost{logger: logger}); err != nil { return nil, fmt.Errorf("could not start the OTLP receiver: %w", err) } return otlpReceiver, nil } +func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate { + return &consumerDelegate{ + logger: logger, + batchConsumer: batchConsumer{ + logger: logger, + spanProcessor: spanProcessor, + spanOptions: processor.SpansOptions{ + SpanFormat: processor.OTLPSpanFormat, + InboundTransport: processor.UnknownTransport, // could be gRPC or HTTP + }, + }, + protoFromTraces: otlp2jaeger.ProtoFromTraces, + } +} + +type consumerDelegate struct { + logger *zap.Logger + batchConsumer batchConsumer + protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error) +} + +func (c *consumerDelegate) consume(ctx context.Context, ld ptrace.Traces) error { + batches, err := c.protoFromTraces(ld) + if err != nil { + return err + } + for _, batch := range batches { + err := c.batchConsumer.consume(batch) + if err != nil { + return err + } + } + return nil +} + // otelHost is a mostly no-op implementation of OTEL component.Host type otelHost struct { logger *zap.Logger diff --git a/cmd/collector/app/handler/otlp_receiver_test.go b/cmd/collector/app/handler/otlp_receiver_test.go new file mode 100644 index 00000000000..5d03a46b388 --- /dev/null +++ b/cmd/collector/app/handler/otlp_receiver_test.go @@ -0,0 +1,126 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestStartOtlpReceiver(t *testing.T) { + spanProcessor := &mockSpanProcessor{} + logger, _ := testutils.NewLogger() + rec, err := StartOtelReceiver( + OtelReceiverOptions{ + GRPCHostPort: ":0", + HTTPHostPort: ":0", + }, + logger, + spanProcessor, + ) + require.NoError(t, err) + defer func() { + assert.NoError(t, rec.Shutdown(context.Background())) + }() + + // Ideally, we want to test with a real gRPC client, but OTEL repos only have those as internal packages. + // So we will rely on otlpreceiver being tested in the OTEL repos, and we only test the consumer function. +} + +func makeTracesOneSpan() ptrace.Traces { + traces := ptrace.NewTraces() + rSpans := traces.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + span := sSpans.Spans().AppendEmpty() + span.SetName("test") + return traces +} + +func TestConsumerDelegate(t *testing.T) { + logger, _ := testutils.NewLogger() + spanProcessor := &mockSpanProcessor{} + consumer := newConsumerDelegate(logger, spanProcessor) + + err := consumer.consume(context.Background(), makeTracesOneSpan()) + require.NoError(t, err) + assert.Len(t, spanProcessor.getSpans(), 1) +} + +func TestConsumerDelegate_Error(t *testing.T) { + logger, logBuf := testutils.NewLogger() + expectedErr := errors.New("test-error") + spanProcessor := &mockSpanProcessor{ + expectedError: expectedErr, + } + consumer := newConsumerDelegate(logger, spanProcessor) + + err := consumer.consume(context.Background(), makeTracesOneSpan()) + require.Error(t, err) + assert.Equal(t, expectedErr, err) + assert.Contains(t, logBuf.String(), "test-error") +} + +func TestStartOtlpReceiver_Error(t *testing.T) { + spanProcessor := &mockSpanProcessor{} + logger, _ := testutils.NewLogger() + _, err := StartOtelReceiver( + OtelReceiverOptions{ + GRPCHostPort: ":-1", + HTTPHostPort: ":-1", + }, + logger, + spanProcessor, + ) + assert.Error(t, err) +} +func TestProtoFromTracesError(t *testing.T) { + mockErr := errors.New("mock error") + c := &consumerDelegate{ + protoFromTraces: func(td ptrace.Traces) ([]*model.Batch, error) { + return nil, mockErr + }, + } + err := c.consume(context.Background(), ptrace.Traces{}) + assert.Equal(t, mockErr, err) +} + +func TestOtelHost_ReportFatalError(t *testing.T) { + logger, buf := testutils.NewLogger() + host := &otelHost{logger: logger} + + defer func() { + _ = recover() + assert.Contains(t, buf.String(), "mock error") + }() + host.ReportFatalError(errors.New("mock error")) + t.Errorf("ReportFatalError did not panic") +} + +func TestOtelHost(t *testing.T) { + host := &otelHost{} + assert.Nil(t, host.GetFactory(component.KindReceiver, config.TracesDataType)) + assert.Nil(t, host.GetExtensions()) + assert.Nil(t, host.GetExporters()) +} diff --git a/cmd/collector/app/handler/otlp_receover_test.go b/cmd/collector/app/handler/otlp_receover_test.go deleted file mode 100644 index b55a0997214..00000000000 --- a/cmd/collector/app/handler/otlp_receover_test.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2022 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package handler - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/jaegertracing/jaeger/pkg/testutils" -) - -func TestOtlpReceiver(t *testing.T) { - spanProcessor := &mockSpanProcessor{} - logger, _ := testutils.NewLogger() - rec, err := StartOtelReceiver(logger, spanProcessor) - require.NoError(t, err) - err = rec.Shutdown(context.Background()) - require.NoError(t, err) -} diff --git a/cmd/collector/app/processor/interface.go b/cmd/collector/app/processor/interface.go index de6f81012d2..f22a92051cd 100644 --- a/cmd/collector/app/processor/interface.go +++ b/cmd/collector/app/processor/interface.go @@ -60,6 +60,8 @@ const ( ZipkinSpanFormat SpanFormat = "zipkin" // ProtoSpanFormat is for Jaeger protobuf Spans. ProtoSpanFormat SpanFormat = "proto" + // OTLPSpanFormat is for OpenTelemetry OTLP format. + OTLPSpanFormat SpanFormat = "otlp" // UnknownSpanFormat is the fallback/catch-all category. UnknownSpanFormat SpanFormat = "unknown" )