From eea3bffbf46e48afd9defe2222f853c4f6b45135 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Mon, 30 May 2022 14:45:44 -0400 Subject: [PATCH] Introduce OTLP receiver configuration flags (#3710) Signed-off-by: Albert Teoh --- cmd/all-in-one/main.go | 5 +- cmd/collector/app/collector.go | 33 +-- cmd/collector/app/collector_test.go | 25 +- cmd/collector/app/flags.go | 158 ----------- cmd/collector/app/flags/flags.go | 259 ++++++++++++++++++ cmd/collector/app/{ => flags}/flags_test.go | 74 +++-- cmd/collector/app/handler/otlp_receiver.go | 96 ++++++- .../app/handler/otlp_receiver_test.go | 122 +++++++-- cmd/collector/app/options.go | 12 +- cmd/collector/app/options_test.go | 3 +- cmd/collector/app/span_handler_builder.go | 3 +- .../app/span_handler_builder_test.go | 11 +- cmd/collector/main.go | 9 +- 13 files changed, 531 insertions(+), 279 deletions(-) delete mode 100644 cmd/collector/app/flags.go create mode 100644 cmd/collector/app/flags/flags.go rename cmd/collector/app/{ => flags}/flags_test.go (65%) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 269595a3291..b9d8da62c5b 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -37,6 +37,7 @@ import ( agentGrpcRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" "github.com/jaegertracing/jaeger/cmd/all-in-one/setupcontext" collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app" + collectorFlags "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/docs" "github.com/jaegertracing/jaeger/cmd/env" "github.com/jaegertracing/jaeger/cmd/flags" @@ -146,7 +147,7 @@ by default uses only in-memory database.`, if err != nil { logger.Fatal("Failed to configure connection for grpc", zap.Error(err)) } - cOpts, err := new(collectorApp.CollectorOptions).InitFromViper(v) + cOpts, err := new(collectorFlags.CollectorOptions).InitFromViper(v, logger) if err != nil { logger.Fatal("Failed to initialize collector", zap.Error(err)) } @@ -227,7 +228,7 @@ by default uses only in-memory database.`, agentApp.AddFlags, agentRep.AddFlags, agentGrpcRep.AddFlags, - collectorApp.AddFlags, + collectorFlags.AddFlags, queryApp.AddFlags, strategyStoreFactory.AddFlags, metricsReaderFactory.AddFlags, diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 4f475ff25ad..64dd39634b6 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" @@ -34,6 +35,11 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) +const ( + metricNumWorkers = "collector.num-workers" + metricQueueSize = "collector.queue-size" +) + // Collector returns the collector as a manageable unit of work type Collector struct { // required to start a new collector @@ -82,10 +88,10 @@ func New(params *CollectorParams) *Collector { } // Start the component and underlying dependencies -func (c *Collector) Start(options *CollectorOptions) error { +func (c *Collector) Start(options *flags.CollectorOptions) error { handlerBuilder := &SpanHandlerBuilder{ SpanWriter: c.spanWriter, - CollectorOpts: *options, + CollectorOpts: options, Logger: c.logger, MetricsFactory: c.metricsFactory, } @@ -145,28 +151,23 @@ func (c *Collector) Start(options *CollectorOptions) error { } c.zkServer = zkServer - otlpReceiver, err := handler.StartOtelReceiver( - handler.OtelReceiverOptions{ - GRPCHostPort: options.OTLP.GRPCHostPort, - HTTPHostPort: options.OTLP.HTTPHostPort, - }, - c.logger, - c.spanProcessor, - ) - if err != nil { - return fmt.Errorf("could not start OTLP receiver: %w", err) + if options.OTLP.Enabled { + otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor) + if err != nil { + return fmt.Errorf("could not start OTLP receiver: %w", err) + } + c.otlpReceiver = otlpReceiver } - c.otlpReceiver = otlpReceiver c.publishOpts(options) return nil } -func (c *Collector) publishOpts(cOpts *CollectorOptions) { +func (c *Collector) publishOpts(cOpts *flags.CollectorOptions) { internalFactory := c.metricsFactory.Namespace(metrics.NSOptions{Name: "internal"}) - internalFactory.Gauge(metrics.Options{Name: collectorNumWorkers}).Update(int64(cOpts.NumWorkers)) - internalFactory.Gauge(metrics.Options{Name: collectorQueueSize}).Update(int64(cOpts.QueueSize)) + internalFactory.Gauge(metrics.Options{Name: metricNumWorkers}).Update(int64(cOpts.NumWorkers)) + internalFactory.Gauge(metrics.Options{Name: metricQueueSize}).Update(int64(cOpts.QueueSize)) } // Close the component and all its underlying dependencies diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index deeb1f6f1c6..e814269dfa5 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -26,6 +26,7 @@ import ( "github.com/uber/jaeger-lib/metrics/metricstest" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/healthcheck" @@ -34,12 +35,14 @@ import ( var _ (io.Closer) = (*Collector)(nil) -func optionsForEphemeralPorts() *CollectorOptions { - collectorOpts := &CollectorOptions{} +func optionsForEphemeralPorts() *flags.CollectorOptions { + collectorOpts := &flags.CollectorOptions{} collectorOpts.GRPC.HostPort = ":0" collectorOpts.HTTP.HostPort = ":0" - collectorOpts.OTLP.GRPCHostPort = ":0" - collectorOpts.OTLP.HTTPHostPort = ":0" + collectorOpts.OTLP.Enabled = true + collectorOpts.OTLP.GRPC.HostPort = ":0" + collectorOpts.OTLP.HTTP.HostPort = ":0" + collectorOpts.Zipkin.HTTPHostPort = ":0" return collectorOpts } @@ -59,13 +62,15 @@ func TestNewCollector(t *testing.T) { StrategyStore: strategyStore, HealthCheck: hc, }) + collectorOpts := optionsForEphemeralPorts() require.NoError(t, c.Start(collectorOpts)) + assert.NotNil(t, c.SpanHandlers()) assert.NoError(t, c.Close()) } func TestCollector_StartErrors(t *testing.T) { - run := func(name string, options *CollectorOptions, expErr string) { + run := func(name string, options *flags.CollectorOptions, expErr string) { t.Run(name, func(t *testing.T) { hc := healthcheck.New() logger := zap.NewNop() @@ -87,7 +92,7 @@ func TestCollector_StartErrors(t *testing.T) { }) } - var options *CollectorOptions + var options *flags.CollectorOptions options = optionsForEphemeralPorts() options.GRPC.HostPort = ":-1" @@ -102,8 +107,12 @@ func TestCollector_StartErrors(t *testing.T) { run("Zipkin", options, "could not start Zipkin server") options = optionsForEphemeralPorts() - options.OTLP.HTTPHostPort = ":-1" - run("OTLP", options, "could not start OTLP receiver") + options.OTLP.GRPC.HostPort = ":-1" + run("OTLP/GRPC", options, "could not start OTLP receiver") + + options = optionsForEphemeralPorts() + options.OTLP.HTTP.HostPort = ":-1" + run("OTLP/HTTP", options, "could not start OTLP receiver") } type mockStrategyStore struct { diff --git a/cmd/collector/app/flags.go b/cmd/collector/app/flags.go deleted file mode 100644 index 2a9bf85392b..00000000000 --- a/cmd/collector/app/flags.go +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "flag" - "fmt" - "time" - - "github.com/spf13/viper" - - "github.com/jaegertracing/jaeger/cmd/flags" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" - "github.com/jaegertracing/jaeger/ports" -) - -const ( - collectorDynQueueSizeMemory = "collector.queue-size-memory" - collectorGRPCHostPort = "collector.grpc-server.host-port" - collectorHTTPHostPort = "collector.http-server.host-port" - collectorNumWorkers = "collector.num-workers" - collectorQueueSize = "collector.queue-size" - collectorTags = "collector.tags" - collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers" - collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins" - collectorZipkinHTTPHostPort = "collector.zipkin.host-port" - collectorGRPCMaxReceiveMessageLength = "collector.grpc-server.max-message-size" - collectorMaxConnectionAge = "collector.grpc-server.max-connection-age" - collectorMaxConnectionAgeGrace = "collector.grpc-server.max-connection-age-grace" -) - -var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{ - Prefix: "collector.grpc", -} - -var tlsHTTPFlagsConfig = tlscfg.ServerFlagsConfig{ - Prefix: "collector.http", -} - -var tlsZipkinFlagsConfig = tlscfg.ServerFlagsConfig{ - Prefix: "collector.zipkin", -} - -// CollectorOptions holds configuration for collector -type CollectorOptions struct { - // DynQueueSizeMemory determines how much memory to use for the queue - DynQueueSizeMemory uint - // QueueSize is the size of collector's queue - QueueSize int - // NumWorkers is the number of internal workers in a collector - NumWorkers int - // HTTP section defines options for HTTP server - HTTP struct { - // HostPort is the host:port address that the collector service listens in on for http requests - HostPort string - // TLS configures secure transport for HTTP endpoint to collect spans - TLS tlscfg.Options - } - // GRPC section defines options for gRPC server - GRPC struct { - // HostPort is the host:port address that the collector service listens in on for gRPC requests - HostPort string - // TLS configures secure transport for gRPC endpoint to collect spans - TLS tlscfg.Options - // MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector. - MaxReceiveMessageLength int - // MaxConnectionAge is a duration for the maximum amount of time a connection may exist. - // See gRPC's keepalive.ServerParameters#MaxConnectionAge. - MaxConnectionAge time.Duration - // MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. - // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. - MaxConnectionAgeGrace time.Duration - } - // OTLP section defines options for servers accepting OpenTelemetry OTLP format - OTLP struct { - GRPCHostPort string - HTTPHostPort string - } - // Zipkin section defines options for Zipkin HTTP server - Zipkin struct { - // HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests - HTTPHostPort string - // ZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from - AllowedOrigins string - // ZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests - AllowedHeaders string - // TLS configures secure transport for Zipkin endpoint to collect spans - TLS tlscfg.Options - } - // CollectorTags is the string representing collector tags to append to each and every span - CollectorTags map[string]string -} - -// AddFlags adds flags for CollectorOptions -func AddFlags(flags *flag.FlagSet) { - flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue") - flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector") - flags.Int(collectorGRPCMaxReceiveMessageLength, DefaultGRPCMaxReceiveMessageLength, "The maximum receivable message size for the collector's GRPC server") - flags.String(collectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:14250 or :14250) of the collector's GRPC server") - flags.String(collectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:14268 or :14268) of the collector's HTTP server") - flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}") - flags.String(collectorZipkinAllowedHeaders, "content-type", "Comma separated list of allowed headers for the Zipkin collector service, default content-type") - flags.String(collectorZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all") - flags.String(collectorZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)") - flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.") - flags.Duration(collectorMaxConnectionAge, 0, "The maximum amount of time a connection may exist. Set this value to a few seconds or minutes on highly elastic environments, so that clients discover new collector nodes frequently. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters") - flags.Duration(collectorMaxConnectionAgeGrace, 0, "The additive period after MaxConnectionAge after which the connection will be forcibly closed. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters") - - tlsGRPCFlagsConfig.AddFlags(flags) - tlsHTTPFlagsConfig.AddFlags(flags) - tlsZipkinFlagsConfig.AddFlags(flags) -} - -// InitFromViper initializes CollectorOptions with properties from viper -func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) (*CollectorOptions, error) { - cOpts.GRPC.HostPort = ports.FormatHostPort(v.GetString(collectorGRPCHostPort)) - cOpts.GRPC.MaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength) - cOpts.GRPC.MaxConnectionAge = v.GetDuration(collectorMaxConnectionAge) - cOpts.GRPC.MaxConnectionAgeGrace = v.GetDuration(collectorMaxConnectionAgeGrace) - if tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v); err == nil { - cOpts.GRPC.TLS = tlsGrpc - } else { - return cOpts, fmt.Errorf("failed to parse gRPC TLS options: %w", err) - } - cOpts.HTTP.HostPort = ports.FormatHostPort(v.GetString(collectorHTTPHostPort)) - if tlsHTTP, err := tlsHTTPFlagsConfig.InitFromViper(v); err == nil { - cOpts.HTTP.TLS = tlsHTTP - } else { - return cOpts, fmt.Errorf("failed to parse HTTP TLS options: %w", err) - } - cOpts.Zipkin.AllowedHeaders = v.GetString(collectorZipkinAllowedHeaders) - cOpts.Zipkin.AllowedOrigins = v.GetString(collectorZipkinAllowedOrigins) - cOpts.Zipkin.HTTPHostPort = ports.FormatHostPort(v.GetString(collectorZipkinHTTPHostPort)) - if tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v); err == nil { - cOpts.Zipkin.TLS = tlsZipkin - } else { - return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err) - } - cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags)) - cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes - cOpts.NumWorkers = v.GetInt(collectorNumWorkers) - cOpts.QueueSize = v.GetInt(collectorQueueSize) - - return cOpts, nil -} diff --git a/cmd/collector/app/flags/flags.go b/cmd/collector/app/flags/flags.go new file mode 100644 index 00000000000..77641d64871 --- /dev/null +++ b/cmd/collector/app/flags/flags.go @@ -0,0 +1,259 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "flag" + "fmt" + "time" + + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/flags" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/ports" +) + +const ( + flagDynQueueSizeMemory = "collector.queue-size-memory" + flagNumWorkers = "collector.num-workers" + flagQueueSize = "collector.queue-size" + flagCollectorTags = "collector.tags" + + flagSuffixHostPort = "host-port" + + flagSuffixGRPCMaxReceiveMessageLength = "max-message-size" + flagSuffixGRPCMaxConnectionAge = "max-connection-age" + flagSuffixGRPCMaxConnectionAgeGrace = "max-connection-age-grace" + + flagCollectorOTLPEnabled = "collector.otlp.enabled" + + flagZipkinHTTPHostPort = "collector.zipkin.host-port" + flagZipkinAllowedHeaders = "collector.zipkin.allowed-headers" + flagZipkinAllowedOrigins = "collector.zipkin.allowed-origins" + + // DefaultNumWorkers is the default number of workers consuming from the processor queue + DefaultNumWorkers = 50 + // DefaultQueueSize is the size of the processor's queue + DefaultQueueSize = 2000 + // DefaultGRPCMaxReceiveMessageLength is the default max receivable message size for the gRPC Collector + DefaultGRPCMaxReceiveMessageLength = 4 * 1024 * 1024 +) + +var grpcServerFlagsCfg = serverFlagsConfig{ + // for legacy reasons the prefixes are different + prefix: "collector.grpc-server", + tls: tlscfg.ServerFlagsConfig{ + Prefix: "collector.grpc", + }, +} + +var httpServerFlagsCfg = serverFlagsConfig{ + // for legacy reasons the prefixes are different + prefix: "collector.http-server", + tls: tlscfg.ServerFlagsConfig{ + Prefix: "collector.http", + }, +} + +var otlpServerFlagsCfg = struct { + GRPC serverFlagsConfig + HTTP serverFlagsConfig +}{ + GRPC: serverFlagsConfig{ + prefix: "collector.otlp.grpc", + tls: tlscfg.ServerFlagsConfig{ + Prefix: "collector.otlp.grpc", + }, + }, + HTTP: serverFlagsConfig{ + prefix: "collector.otlp.http", + tls: tlscfg.ServerFlagsConfig{ + Prefix: "collector.otlp.http", + }, + }, +} + +var tlsZipkinFlagsConfig = tlscfg.ServerFlagsConfig{ + Prefix: "collector.zipkin", +} + +// CollectorOptions holds configuration for collector +type CollectorOptions struct { + // DynQueueSizeMemory determines how much memory to use for the queue + DynQueueSizeMemory uint + // QueueSize is the size of collector's queue + QueueSize int + // NumWorkers is the number of internal workers in a collector + NumWorkers int + // HTTP section defines options for HTTP server + HTTP HTTPOptions + // GRPC section defines options for gRPC server + GRPC GRPCOptions + // OTLP section defines options for servers accepting OpenTelemetry OTLP format + OTLP struct { + Enabled bool + GRPC GRPCOptions + HTTP HTTPOptions + } + // Zipkin section defines options for Zipkin HTTP server + Zipkin struct { + // HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests + HTTPHostPort string + // ZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from + AllowedOrigins string + // ZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests + AllowedHeaders string + // TLS configures secure transport for Zipkin endpoint to collect spans + TLS tlscfg.Options + } + // CollectorTags is the string representing collector tags to append to each and every span + CollectorTags map[string]string +} + +type serverFlagsConfig struct { + prefix string + tls tlscfg.ServerFlagsConfig +} + +// HTTPOptions defines options for an HTTP server +type HTTPOptions struct { + // HostPort is the host:port address that the server listens on + HostPort string + // TLS configures secure transport for HTTP endpoint + TLS tlscfg.Options +} + +// GRPCOptions defines options for a gRPC server +type GRPCOptions struct { + // HostPort is the host:port address that the collector service listens in on for gRPC requests + HostPort string + // TLS configures secure transport for gRPC endpoint to collect spans + TLS tlscfg.Options + // MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector. + MaxReceiveMessageLength int + // MaxConnectionAge is a duration for the maximum amount of time a connection may exist. + // See gRPC's keepalive.ServerParameters#MaxConnectionAge. + MaxConnectionAge time.Duration + // MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. + // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. + MaxConnectionAgeGrace time.Duration +} + +// AddFlags adds flags for CollectorOptions +func AddFlags(flags *flag.FlagSet) { + flags.Int(flagNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue") + flags.Int(flagQueueSize, DefaultQueueSize, "The queue size of the collector") + flags.Uint(flagDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.") + flags.String(flagCollectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}") + + addHTTPFlags(flags, httpServerFlagsCfg, ports.PortToHostPort(ports.CollectorHTTP)) + addGRPCFlags(flags, grpcServerFlagsCfg, ports.PortToHostPort(ports.CollectorGRPC)) + + flags.Bool(flagCollectorOTLPEnabled, false, "Enables OpenTelemetry OTLP receiver on dedicated HTTP and gRPC ports") + addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, "") + addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, "") + + flags.String(flagZipkinAllowedHeaders, "content-type", "Comma separated list of allowed headers for the Zipkin collector service, default content-type") + flags.String(flagZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all") + flags.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)") + tlsZipkinFlagsConfig.AddFlags(flags) +} + +func addHTTPFlags(flags *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort string) { + flags.String(cfg.prefix+"."+flagSuffixHostPort, defaultHostPort, "The host:port (e.g. 127.0.0.1:12345 or :12345) of the collector's HTTP server") + cfg.tls.AddFlags(flags) +} + +func addGRPCFlags(flags *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort string) { + flags.String( + cfg.prefix+"."+flagSuffixHostPort, + defaultHostPort, + "The host:port (e.g. 127.0.0.1:12345 or :12345) of the collector's gRPC server") + flags.Int( + cfg.prefix+"."+flagSuffixGRPCMaxReceiveMessageLength, + DefaultGRPCMaxReceiveMessageLength, + "The maximum receivable message size for the collector's gRPC server") + flags.Duration( + cfg.prefix+"."+flagSuffixGRPCMaxConnectionAge, + 0, + "The maximum amount of time a connection may exist. Set this value to a few seconds or minutes on highly elastic environments, so that clients discover new collector nodes frequently. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters") + flags.Duration( + cfg.prefix+"."+flagSuffixGRPCMaxConnectionAgeGrace, + 0, + "The additive period after MaxConnectionAge after which the connection will be forcibly closed. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters") + cfg.tls.AddFlags(flags) +} + +func (opts *HTTPOptions) initFromViper(v *viper.Viper, logger *zap.Logger, cfg serverFlagsConfig) error { + opts.HostPort = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) + if tlsOpts, err := cfg.tls.InitFromViper(v); err == nil { + opts.TLS = tlsOpts + } else { + return fmt.Errorf("failed to parse HTTP TLS options: %w", err) + } + return nil +} + +func (opts *GRPCOptions) initFromViper(v *viper.Viper, logger *zap.Logger, cfg serverFlagsConfig) error { + opts.HostPort = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) + opts.MaxReceiveMessageLength = v.GetInt(cfg.prefix + "." + flagSuffixGRPCMaxReceiveMessageLength) + opts.MaxConnectionAge = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAge) + opts.MaxConnectionAgeGrace = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAgeGrace) + if tlsOpts, err := cfg.tls.InitFromViper(v); err == nil { + opts.TLS = tlsOpts + } else { + return fmt.Errorf("failed to parse gRPC TLS options: %w", err) + } + + return nil +} + +// InitFromViper initializes CollectorOptions with properties from viper +func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*CollectorOptions, error) { + cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(flagCollectorTags)) + cOpts.NumWorkers = v.GetInt(flagNumWorkers) + cOpts.QueueSize = v.GetInt(flagQueueSize) + cOpts.DynQueueSizeMemory = v.GetUint(flagDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes + + if err := cOpts.HTTP.initFromViper(v, logger, httpServerFlagsCfg); err != nil { + return cOpts, fmt.Errorf("failed to parse HTTP server options: %w", err) + } + + if err := cOpts.GRPC.initFromViper(v, logger, grpcServerFlagsCfg); err != nil { + return cOpts, fmt.Errorf("failed to parse gRPC server options: %w", err) + } + + cOpts.OTLP.Enabled = v.GetBool(flagCollectorOTLPEnabled) + if err := cOpts.OTLP.HTTP.initFromViper(v, logger, otlpServerFlagsCfg.HTTP); err != nil { + return cOpts, fmt.Errorf("failed to parse OTLP/HTTP server options: %w", err) + } + if err := cOpts.OTLP.GRPC.initFromViper(v, logger, otlpServerFlagsCfg.GRPC); err != nil { + return cOpts, fmt.Errorf("failed to parse OTLP/gRPC server options: %w", err) + } + + cOpts.Zipkin.AllowedHeaders = v.GetString(flagZipkinAllowedHeaders) + cOpts.Zipkin.AllowedOrigins = v.GetString(flagZipkinAllowedOrigins) + cOpts.Zipkin.HTTPHostPort = ports.FormatHostPort(v.GetString(flagZipkinHTTPHostPort)) + if tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v); err == nil { + cOpts.Zipkin.TLS = tlsZipkin + } else { + return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err) + } + + return cOpts, nil +} diff --git a/cmd/collector/app/flags_test.go b/cmd/collector/app/flags/flags_test.go similarity index 65% rename from cmd/collector/app/flags_test.go rename to cmd/collector/app/flags/flags_test.go index 3b3b5b30a79..c9299d26bcb 100644 --- a/cmd/collector/app/flags_test.go +++ b/cmd/collector/app/flags/flags_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package flags import ( "testing" @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config" ) @@ -32,7 +33,8 @@ func TestCollectorOptionsWithFlags_CheckHostPort(t *testing.T) { "--collector.grpc-server.host-port=1234", "--collector.zipkin.host-port=3456", }) - c.InitFromViper(v) + _, err := c.InitFromViper(v, zap.NewNop()) + require.NoError(t, err) assert.Equal(t, ":5678", c.HTTP.HostPort) assert.Equal(t, ":1234", c.GRPC.HostPort) @@ -47,50 +49,36 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) { "--collector.grpc-server.host-port=127.0.0.1:1234", "--collector.zipkin.host-port=0.0.0.0:3456", }) - c.InitFromViper(v) + _, err := c.InitFromViper(v, zap.NewNop()) + require.NoError(t, err) assert.Equal(t, ":5678", c.HTTP.HostPort) assert.Equal(t, "127.0.0.1:1234", c.GRPC.HostPort) assert.Equal(t, "0.0.0.0:3456", c.Zipkin.HTTPHostPort) } -func TestCollectorOptionsWithFailedHTTPFlags(t *testing.T) { - c := &CollectorOptions{} - v, command := config.Viperize(AddFlags) - err := command.ParseFlags([]string{ - "--collector.http.tls.enabled=false", - "--collector.http.tls.cert=blah", // invalid unless tls.enabled - }) - require.NoError(t, err) - _, err = c.InitFromViper(v) - require.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse HTTP TLS options") -} - -func TestCollectorOptionsWithFailedGRPCFlags(t *testing.T) { - c := &CollectorOptions{} - v, command := config.Viperize(AddFlags) - err := command.ParseFlags([]string{ - "--collector.grpc.tls.enabled=false", - "--collector.grpc.tls.cert=blah", // invalid unless tls.enabled - }) - require.NoError(t, err) - _, err = c.InitFromViper(v) - require.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse gRPC TLS options") -} - -func TestCollectorOptionsWithFailedZipkinFlags(t *testing.T) { - c := &CollectorOptions{} - v, command := config.Viperize(AddFlags) - err := command.ParseFlags([]string{ - "--collector.zipkin.tls.enabled=false", - "--collector.zipkin.tls.cert=blah", // invalid unless tls.enabled - }) - require.NoError(t, err) - _, err = c.InitFromViper(v) - require.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse Zipkin TLS options") +func TestCollectorOptionsWithFailedTLSFlags(t *testing.T) { + prefixes := []string{ + "--collector.http", + "--collector.grpc", + "--collector.zipkin", + "--collector.otlp.http", + "--collector.otlp.grpc", + } + for _, prefix := range prefixes { + t.Run(prefix, func(t *testing.T) { + c := &CollectorOptions{} + v, command := config.Viperize(AddFlags) + err := command.ParseFlags([]string{ + prefix + ".tls.enabled=false", + prefix + ".tls.cert=blah", // invalid unless tls.enabled + }) + require.NoError(t, err) + _, err = c.InitFromViper(v, zap.NewNop()) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse") + }) + } } func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) { @@ -99,7 +87,8 @@ func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) { command.ParseFlags([]string{ "--collector.grpc-server.max-message-size=8388608", }) - c.InitFromViper(v) + _, err := c.InitFromViper(v, zap.NewNop()) + require.NoError(t, err) assert.Equal(t, 8388608, c.GRPC.MaxReceiveMessageLength) } @@ -111,7 +100,8 @@ func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) { "--collector.grpc-server.max-connection-age=5m", "--collector.grpc-server.max-connection-age-grace=1m", }) - c.InitFromViper(v) + _, err := c.InitFromViper(v, zap.NewNop()) + require.NoError(t, err) assert.Equal(t, 5*time.Minute, c.GRPC.MaxConnectionAge) assert.Equal(t, time.Minute, c.GRPC.MaxConnectionAgeGrace) diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index 91fcfc0208f..41da6163bf4 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -21,34 +21,52 @@ import ( otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/otlpreceiver" "go.opentelemetry.io/otel" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) var _ component.Host = (*otelHost)(nil) // API check -// OtelReceiverOptions allows configuration of the receiver. -type OtelReceiverOptions struct { - GRPCHostPort string - HTTPHostPort string +// StartOTLPReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports. +func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) { + otlpFactory := otlpreceiver.NewFactory() + return startOTLPReceiver( + options, + logger, + spanProcessor, + otlpFactory, + consumer.NewTraces, + otlpFactory.CreateTracesReceiver, + ) } -// StartOtelReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports. -func StartOtelReceiver(options OtelReceiverOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) { - otlpFactory := otlpreceiver.NewFactory() +// Some of OTELCOL constructor functions return errors when passed nil arguments, +// which is a situation we cannot reproduce. To test our own error handling, this +// function allows to mock those constructors. +func startOTLPReceiver( + options *flags.CollectorOptions, + logger *zap.Logger, + spanProcessor processor.SpanProcessor, + // from here: params that can be mocked in tests + otlpFactory component.ReceiverFactory, + newTraces func(consume consumer.ConsumeTracesFunc, options ...consumer.Option) (consumer.Traces, error), + createTracesReceiver func(ctx context.Context, set component.ReceiverCreateSettings, + cfg config.Receiver, nextConsumer consumer.Traces) (component.TracesReceiver, error), +) (component.TracesReceiver, error) { otlpReceiverConfig := otlpFactory.CreateDefaultConfig().(*otlpreceiver.Config) - if options.GRPCHostPort != "" { - otlpReceiverConfig.GRPC.NetAddr.Endpoint = options.GRPCHostPort - } - if options.HTTPHostPort != "" { - otlpReceiverConfig.HTTP.Endpoint = options.HTTPHostPort - } + applyGRPCSettings(otlpReceiverConfig.GRPC, &options.OTLP.GRPC) + applyHTTPSettings(otlpReceiverConfig.HTTP, &options.OTLP.HTTP) otlpReceiverSettings := component.ReceiverCreateSettings{ TelemetrySettings: component.TelemetrySettings{ Logger: logger, @@ -58,19 +76,67 @@ func StartOtelReceiver(options OtelReceiverOptions, logger *zap.Logger, spanProc otlpConsumer := newConsumerDelegate(logger, spanProcessor) // the following two constructors never return errors given non-nil arguments, so we ignore errors - nextConsumer, _ := consumer.NewTraces(otlpConsumer.consume) - otlpReceiver, _ := otlpFactory.CreateTracesReceiver( + nextConsumer, err := newTraces(otlpConsumer.consume) + if err != nil { + return nil, fmt.Errorf("could not create the OTLP consumer: %w", err) + } + otlpReceiver, err := createTracesReceiver( context.Background(), otlpReceiverSettings, otlpReceiverConfig, nextConsumer, ) + if err != nil { + return nil, fmt.Errorf("could not create the OTLP receiver: %w", err) + } if err := otlpReceiver.Start(context.Background(), &otelHost{logger: logger}); err != nil { return nil, fmt.Errorf("could not start the OTLP receiver: %w", err) } return otlpReceiver, nil } +func applyGRPCSettings(cfg *configgrpc.GRPCServerSettings, opts *flags.GRPCOptions) { + if opts.HostPort != "" { + cfg.NetAddr.Endpoint = opts.HostPort + } + if opts.TLS.Enabled { + cfg.TLSSetting = applyTLSSettings(&opts.TLS) + } + if opts.MaxReceiveMessageLength > 0 { + cfg.MaxRecvMsgSizeMiB = uint64(opts.MaxReceiveMessageLength / (1024 * 1024)) + } + if opts.MaxConnectionAge != 0 || opts.MaxConnectionAgeGrace != 0 { + cfg.Keepalive = &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionAge: opts.MaxConnectionAge, + MaxConnectionAgeGrace: opts.MaxConnectionAgeGrace, + }, + } + } +} + +func applyHTTPSettings(cfg *confighttp.HTTPServerSettings, opts *flags.HTTPOptions) { + if opts.HostPort != "" { + cfg.Endpoint = opts.HostPort + } + if opts.TLS.Enabled { + cfg.TLSSetting = applyTLSSettings(&opts.TLS) + } +} + +func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting { + return &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: opts.CAPath, + CertFile: opts.CertPath, + KeyFile: opts.KeyPath, + MinVersion: opts.MinVersion, + MaxVersion: opts.MaxVersion, + }, + ClientCAFile: opts.ClientCAPath, + } +} + func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate { return &consumerDelegate{ batchConsumer: batchConsumer{ diff --git a/cmd/collector/app/handler/otlp_receiver_test.go b/cmd/collector/app/handler/otlp_receiver_test.go index a9eeb8c7dd8..83f17e412e1 100644 --- a/cmd/collector/app/handler/otlp_receiver_test.go +++ b/cmd/collector/app/handler/otlp_receiver_test.go @@ -18,28 +18,37 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/receiver/otlpreceiver" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/testutils" ) +func optionsWithPorts(port string) *flags.CollectorOptions { + opts := &flags.CollectorOptions{} + opts.OTLP.GRPC = flags.GRPCOptions{ + HostPort: port, + } + opts.OTLP.HTTP = flags.HTTPOptions{ + HostPort: port, + } + return opts +} + func TestStartOtlpReceiver(t *testing.T) { spanProcessor := &mockSpanProcessor{} logger, _ := testutils.NewLogger() - rec, err := StartOtelReceiver( - OtelReceiverOptions{ - GRPCHostPort: ":0", - HTTPHostPort: ":0", - }, - logger, - spanProcessor, - ) + rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor) require.NoError(t, err) defer func() { assert.NoError(t, rec.Shutdown(context.Background())) @@ -88,16 +97,29 @@ func TestConsumerDelegate(t *testing.T) { func TestStartOtlpReceiver_Error(t *testing.T) { spanProcessor := &mockSpanProcessor{} logger, _ := testutils.NewLogger() - _, err := StartOtelReceiver( - OtelReceiverOptions{ - GRPCHostPort: ":-1", - HTTPHostPort: ":-1", - }, - logger, - spanProcessor, - ) - assert.Error(t, err) + opts := optionsWithPorts(":-1") + _, err := StartOTLPReceiver(opts, logger, spanProcessor) + require.Error(t, err) + assert.Contains(t, err.Error(), "could not start the OTLP receiver") + + newTraces := func(consumer.ConsumeTracesFunc, ...consumer.Option) (consumer.Traces, error) { + return nil, errors.New("mock error") + } + f := otlpreceiver.NewFactory() + _, err = startOTLPReceiver(opts, logger, spanProcessor, f, newTraces, f.CreateTracesReceiver) + require.Error(t, err) + assert.Contains(t, err.Error(), "could not create the OTLP consumer") + + createTracesReceiver := func( + context.Context, component.ReceiverCreateSettings, config.Receiver, consumer.Traces, + ) (component.TracesReceiver, error) { + return nil, errors.New("mock error") + } + _, err = startOTLPReceiver(opts, logger, spanProcessor, f, consumer.NewTraces, createTracesReceiver) + require.Error(t, err) + assert.Contains(t, err.Error(), "could not create the OTLP receiver") } + func TestProtoFromTracesError(t *testing.T) { mockErr := errors.New("mock error") c := &consumerDelegate{ @@ -127,3 +149,69 @@ func TestOtelHost(t *testing.T) { assert.Nil(t, host.GetExtensions()) assert.Nil(t, host.GetExporters()) } + +func TestApplyOTLPGRPCServerSettings(t *testing.T) { + otlpFactory := otlpreceiver.NewFactory() + otlpReceiverConfig := otlpFactory.CreateDefaultConfig().(*otlpreceiver.Config) + + grpcOpts := &flags.GRPCOptions{ + HostPort: ":54321", + MaxReceiveMessageLength: 42 * 1024 * 1024, + MaxConnectionAge: 33 * time.Second, + MaxConnectionAgeGrace: 37 * time.Second, + TLS: tlscfg.Options{ + Enabled: true, + CAPath: "ca", + CertPath: "cert", + KeyPath: "key", + ClientCAPath: "clientca", + MinVersion: "1.1", + MaxVersion: "1.3", + }, + } + applyGRPCSettings(otlpReceiverConfig.GRPC, grpcOpts) + out := otlpReceiverConfig.GRPC + assert.Equal(t, out.NetAddr.Endpoint, ":54321") + assert.EqualValues(t, out.MaxRecvMsgSizeMiB, 42) + require.NotNil(t, out.Keepalive) + require.NotNil(t, out.Keepalive.ServerParameters) + assert.Equal(t, out.Keepalive.ServerParameters.MaxConnectionAge, 33*time.Second) + assert.Equal(t, out.Keepalive.ServerParameters.MaxConnectionAgeGrace, 37*time.Second) + require.NotNil(t, out.TLSSetting) + assert.Equal(t, out.TLSSetting.CAFile, "ca") + assert.Equal(t, out.TLSSetting.CertFile, "cert") + assert.Equal(t, out.TLSSetting.KeyFile, "key") + assert.Equal(t, out.TLSSetting.ClientCAFile, "clientca") + assert.Equal(t, out.TLSSetting.MinVersion, "1.1") + assert.Equal(t, out.TLSSetting.MaxVersion, "1.3") +} + +func TestApplyOTLPHTTPServerSettings(t *testing.T) { + otlpFactory := otlpreceiver.NewFactory() + otlpReceiverConfig := otlpFactory.CreateDefaultConfig().(*otlpreceiver.Config) + + httpOpts := &flags.HTTPOptions{ + HostPort: ":12345", + TLS: tlscfg.Options{ + Enabled: true, + CAPath: "ca", + CertPath: "cert", + KeyPath: "key", + ClientCAPath: "clientca", + MinVersion: "1.1", + MaxVersion: "1.3", + }, + } + + applyHTTPSettings(otlpReceiverConfig.HTTP, httpOpts) + + out := otlpReceiverConfig.HTTP + assert.Equal(t, out.Endpoint, ":12345") + require.NotNil(t, out.TLSSetting) + assert.Equal(t, out.TLSSetting.CAFile, "ca") + assert.Equal(t, out.TLSSetting.CertFile, "cert") + assert.Equal(t, out.TLSSetting.KeyFile, "key") + assert.Equal(t, out.TLSSetting.ClientCAFile, "clientca") + assert.Equal(t, out.TLSSetting.MinVersion, "1.1") + assert.Equal(t, out.TLSSetting.MaxVersion, "1.3") +} diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 44fae71cc82..9847d8e7245 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -19,20 +19,12 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/model" ) -const ( - // DefaultNumWorkers is the default number of workers consuming from the processor queue - DefaultNumWorkers = 50 - // DefaultQueueSize is the size of the processor's queue - DefaultQueueSize = 2000 - // DefaultGRPCMaxReceiveMessageLength is the default max receivable message size for the gRPC Collector - DefaultGRPCMaxReceiveMessageLength = 4 * 1024 * 1024 -) - type options struct { logger *zap.Logger serviceMetrics metrics.Factory @@ -191,7 +183,7 @@ func (o options) apply(opts ...Option) options { ret.spanFilter = func(span *model.Span) bool { return true } } if ret.numWorkers == 0 { - ret.numWorkers = DefaultNumWorkers + ret.numWorkers = flags.DefaultNumWorkers } return ret } diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index 00e79f99a0e..6a92344b0fd 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -22,6 +22,7 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" ) @@ -54,7 +55,7 @@ func TestAllOptionSet(t *testing.T) { func TestNoOptionsSet(t *testing.T) { opts := Options.apply() - assert.EqualValues(t, DefaultNumWorkers, opts.numWorkers) + assert.EqualValues(t, flags.DefaultNumWorkers, opts.numWorkers) assert.EqualValues(t, 0, opts.queueSize) assert.Nil(t, opts.collectorTags) assert.False(t, opts.reportBusy) diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index 69e5aac865e..362b6f1764c 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -21,6 +21,7 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" @@ -31,7 +32,7 @@ import ( // SpanHandlerBuilder holds configuration required for handlers type SpanHandlerBuilder struct { SpanWriter spanstore.Writer - CollectorOpts CollectorOptions + CollectorOpts *flags.CollectorOptions Logger *zap.Logger MetricsFactory metrics.Factory } diff --git a/cmd/collector/app/span_handler_builder_test.go b/cmd/collector/app/span_handler_builder_test.go index cac63f818ec..0130165ac69 100644 --- a/cmd/collector/app/span_handler_builder_test.go +++ b/cmd/collector/app/span_handler_builder_test.go @@ -23,30 +23,31 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "github.com/jaegertracing/jaeger/cmd/flags" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" + cmdFlags "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/plugin/storage/memory" ) func TestNewSpanHandlerBuilder(t *testing.T) { - v, command := config.Viperize(flags.AddFlags, AddFlags) + v, command := config.Viperize(cmdFlags.AddFlags, flags.AddFlags) require.NoError(t, command.ParseFlags([]string{})) - cOpts, err := new(CollectorOptions).InitFromViper(v) + cOpts, err := new(flags.CollectorOptions).InitFromViper(v, zap.NewNop()) require.NoError(t, err) spanWriter := memory.NewStore() builder := &SpanHandlerBuilder{ SpanWriter: spanWriter, - CollectorOpts: *cOpts, + CollectorOpts: cOpts, } assert.NotNil(t, builder.logger()) assert.NotNil(t, builder.metricsFactory()) builder = &SpanHandlerBuilder{ SpanWriter: spanWriter, - CollectorOpts: *cOpts, + CollectorOpts: cOpts, Logger: zap.NewNop(), MetricsFactory: metrics.NullFactory, } diff --git a/cmd/collector/main.go b/cmd/collector/main.go index ac173033974..79b9a2e09bb 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -30,9 +30,10 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app" + "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/docs" "github.com/jaegertracing/jaeger/cmd/env" - "github.com/jaegertracing/jaeger/cmd/flags" + cmdFlags "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/cmd/status" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/version" @@ -44,7 +45,7 @@ import ( const serviceName = "jaeger-collector" func main() { - svc := flags.NewService(ports.CollectorAdminHTTP) + svc := cmdFlags.NewService(ports.CollectorAdminHTTP) storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr)) if err != nil { @@ -106,7 +107,7 @@ func main() { Aggregator: aggregator, HealthCheck: svc.HC(), }) - collectorOpts, err := new(app.CollectorOptions).InitFromViper(v) + collectorOpts, err := new(flags.CollectorOptions).InitFromViper(v, logger) if err != nil { logger.Fatal("Failed to initialize collector", zap.Error(err)) } @@ -143,7 +144,7 @@ func main() { v, command, svc.AddFlags, - app.AddFlags, + flags.AddFlags, storageFactory.AddPipelineFlags, strategyStoreFactory.AddFlags, )