diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index a45f51f8dc4e..64dd39634b62 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -151,11 +151,13 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { } c.zkServer = zkServer - otlpReceiver, err := handler.StartOtelReceiver(options, c.logger, c.spanProcessor) - if err != nil { - return fmt.Errorf("could not start OTLP receiver: %w", err) + if options.OTLP.Enabled { + otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor) + if err != nil { + return fmt.Errorf("could not start OTLP receiver: %w", err) + } + c.otlpReceiver = otlpReceiver } - c.otlpReceiver = otlpReceiver c.publishOpts(options) diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 862086ba779d..e814269dfa5c 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -39,8 +39,10 @@ func optionsForEphemeralPorts() *flags.CollectorOptions { collectorOpts := &flags.CollectorOptions{} collectorOpts.GRPC.HostPort = ":0" collectorOpts.HTTP.HostPort = ":0" + collectorOpts.OTLP.Enabled = true collectorOpts.OTLP.GRPC.HostPort = ":0" collectorOpts.OTLP.HTTP.HostPort = ":0" + collectorOpts.Zipkin.HTTPHostPort = ":0" return collectorOpts } @@ -60,8 +62,10 @@ func TestNewCollector(t *testing.T) { StrategyStore: strategyStore, HealthCheck: hc, }) + collectorOpts := optionsForEphemeralPorts() require.NoError(t, c.Start(collectorOpts)) + assert.NotNil(t, c.SpanHandlers()) assert.NoError(t, c.Close()) } diff --git a/cmd/collector/app/flags/flags.go b/cmd/collector/app/flags/flags.go index ec7976d9d027..77641d648712 100644 --- a/cmd/collector/app/flags/flags.go +++ b/cmd/collector/app/flags/flags.go @@ -40,6 +40,8 @@ const ( flagSuffixGRPCMaxConnectionAge = "max-connection-age" flagSuffixGRPCMaxConnectionAgeGrace = "max-connection-age-grace" + flagCollectorOTLPEnabled = "collector.otlp.enabled" + flagZipkinHTTPHostPort = "collector.zipkin.host-port" flagZipkinAllowedHeaders = "collector.zipkin.allowed-headers" flagZipkinAllowedOrigins = "collector.zipkin.allowed-origins" @@ -104,8 +106,9 @@ type CollectorOptions struct { GRPC GRPCOptions // OTLP section defines options for servers accepting OpenTelemetry OTLP format OTLP struct { - GRPC GRPCOptions - HTTP HTTPOptions + Enabled bool + GRPC GRPCOptions + HTTP HTTPOptions } // Zipkin section defines options for Zipkin HTTP server Zipkin struct { @@ -161,6 +164,7 @@ func AddFlags(flags *flag.FlagSet) { addHTTPFlags(flags, httpServerFlagsCfg, ports.PortToHostPort(ports.CollectorHTTP)) addGRPCFlags(flags, grpcServerFlagsCfg, ports.PortToHostPort(ports.CollectorGRPC)) + flags.Bool(flagCollectorOTLPEnabled, false, "Enables OpenTelemetry OTLP receiver on dedicated HTTP and gRPC ports") addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, "") addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, "") @@ -234,10 +238,10 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) return cOpts, fmt.Errorf("failed to parse gRPC server options: %w", err) } + cOpts.OTLP.Enabled = v.GetBool(flagCollectorOTLPEnabled) if err := cOpts.OTLP.HTTP.initFromViper(v, logger, otlpServerFlagsCfg.HTTP); err != nil { return cOpts, fmt.Errorf("failed to parse OTLP/HTTP server options: %w", err) } - if err := cOpts.OTLP.GRPC.initFromViper(v, logger, otlpServerFlagsCfg.GRPC); err != nil { return cOpts, fmt.Errorf("failed to parse OTLP/gRPC server options: %w", err) } diff --git a/cmd/collector/app/flags/flags_test.go b/cmd/collector/app/flags/flags_test.go index 7e8657bab2fd..c9299d26bcb1 100644 --- a/cmd/collector/app/flags/flags_test.go +++ b/cmd/collector/app/flags/flags_test.go @@ -57,43 +57,28 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) { assert.Equal(t, "0.0.0.0:3456", c.Zipkin.HTTPHostPort) } -func TestCollectorOptionsWithFailedHTTPFlags(t *testing.T) { - c := &CollectorOptions{} - v, command := config.Viperize(AddFlags) - err := command.ParseFlags([]string{ - "--collector.http.tls.enabled=false", - "--collector.http.tls.cert=blah", // invalid unless tls.enabled - }) - require.NoError(t, err) - _, err = c.InitFromViper(v, zap.NewNop()) - require.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse HTTP TLS options") -} - -func TestCollectorOptionsWithFailedGRPCFlags(t *testing.T) { - c := &CollectorOptions{} - v, command := config.Viperize(AddFlags) - err := command.ParseFlags([]string{ - "--collector.grpc.tls.enabled=false", - "--collector.grpc.tls.cert=blah", // invalid unless tls.enabled - }) - require.NoError(t, err) - _, err = c.InitFromViper(v, zap.NewNop()) - require.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse gRPC TLS options") -} - -func TestCollectorOptionsWithFailedZipkinFlags(t *testing.T) { - c := &CollectorOptions{} - v, command := config.Viperize(AddFlags) - err := command.ParseFlags([]string{ - "--collector.zipkin.tls.enabled=false", - "--collector.zipkin.tls.cert=blah", // invalid unless tls.enabled - }) - require.NoError(t, err) - _, err = c.InitFromViper(v, zap.NewNop()) - require.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse Zipkin TLS options") +func TestCollectorOptionsWithFailedTLSFlags(t *testing.T) { + prefixes := []string{ + "--collector.http", + "--collector.grpc", + "--collector.zipkin", + "--collector.otlp.http", + "--collector.otlp.grpc", + } + for _, prefix := range prefixes { + t.Run(prefix, func(t *testing.T) { + c := &CollectorOptions{} + v, command := config.Viperize(AddFlags) + err := command.ParseFlags([]string{ + prefix + ".tls.enabled=false", + prefix + ".tls.cert=blah", // invalid unless tls.enabled + }) + require.NoError(t, err) + _, err = c.InitFromViper(v, zap.NewNop()) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse") + }) + } } func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) { diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index c29ad3df3015..31be44633e5b 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -17,7 +17,6 @@ package handler import ( "context" "fmt" - "time" otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" @@ -31,45 +30,14 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) var _ component.Host = (*otelHost)(nil) // API check -// OtelReceiverOptions allows configuration of the receiver. -type OtelReceiverOptions struct { - GRPC OtelReceiverGRPCOptions - HTTP OtelReceiverHTTPOptions -} - -// OtelReceiverGRPCOptions allows configuration of the GRPC receiver. -type OtelReceiverGRPCOptions struct { - // HostPort is the host:port address that the server listens on - HostPort string - // TLS configures secure transport for HTTP endpoint - TLS tlscfg.Options - // MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector. - MaxReceiveMessageLength int - // MaxConnectionAge is a duration for the maximum amount of time a connection may exist. - // See gRPC's keepalive.ServerParameters#MaxConnectionAge. - MaxConnectionAge time.Duration - // MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. - // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. - MaxConnectionAgeGrace time.Duration -} - -// OtelReceiverHTTPOptions defines options for an HTTP server -type OtelReceiverHTTPOptions struct { - // HostPort is the host:port address that the server listens on - HostPort string - // TLS configures secure transport for HTTP endpoint - TLS tlscfg.Options -} - -// StartOtelReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports. -func StartOtelReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) { +// StartOTLPReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports. +func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) { otlpFactory := otlpreceiver.NewFactory() - return startOtelReceiver( + return startOTLPReceiver( options, logger, spanProcessor, @@ -82,7 +50,7 @@ func StartOtelReceiver(options *flags.CollectorOptions, logger *zap.Logger, span // Some of OTELCOL constructor functions return errors when passed nil arguments, // which is a situation we cannot reproduce. To test our own error handling, this // function allows to mock those constructors. -func startOtelReceiver( +func startOTLPReceiver( options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, diff --git a/cmd/collector/app/handler/otlp_receiver_test.go b/cmd/collector/app/handler/otlp_receiver_test.go index fb56d67d0be5..fef7a02ac1d9 100644 --- a/cmd/collector/app/handler/otlp_receiver_test.go +++ b/cmd/collector/app/handler/otlp_receiver_test.go @@ -46,7 +46,7 @@ func optionsWithPorts(port string) *flags.CollectorOptions { func TestStartOtlpReceiver(t *testing.T) { spanProcessor := &mockSpanProcessor{} logger, _ := testutils.NewLogger() - rec, err := StartOtelReceiver(optionsWithPorts(":0"), logger, spanProcessor) + rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor) require.NoError(t, err) defer func() { assert.NoError(t, rec.Shutdown(context.Background())) @@ -96,7 +96,7 @@ func TestStartOtlpReceiver_Error(t *testing.T) { spanProcessor := &mockSpanProcessor{} logger, _ := testutils.NewLogger() opts := optionsWithPorts(":-1") - _, err := StartOtelReceiver(opts, logger, spanProcessor) + _, err := StartOTLPReceiver(opts, logger, spanProcessor) require.Error(t, err) assert.Contains(t, err.Error(), "could not start the OTLP receiver") @@ -104,7 +104,7 @@ func TestStartOtlpReceiver_Error(t *testing.T) { return nil, errors.New("mock error") } f := otlpreceiver.NewFactory() - _, err = startOtelReceiver(opts, logger, spanProcessor, f, newTraces, f.CreateTracesReceiver) + _, err = startOTLPReceiver(opts, logger, spanProcessor, f, newTraces, f.CreateTracesReceiver) require.Error(t, err) assert.Contains(t, err.Error(), "could not create the OTLP consumer") @@ -112,7 +112,7 @@ func TestStartOtlpReceiver_Error(t *testing.T) { _ config.Receiver, _ consumer.Traces) (component.TracesReceiver, error) { return nil, errors.New("mock error") } - _, err = startOtelReceiver(opts, logger, spanProcessor, f, consumer.NewTraces, createTracesReceiver) + _, err = startOTLPReceiver(opts, logger, spanProcessor, f, consumer.NewTraces, createTracesReceiver) require.Error(t, err) assert.Contains(t, err.Error(), "could not create the OTLP receiver") }