Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed May 29, 2022
1 parent cb6a8c9 commit 9e18c86
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 84 deletions.
10 changes: 6 additions & 4 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
}
c.zkServer = zkServer

otlpReceiver, err := handler.StartOtelReceiver(options, c.logger, c.spanProcessor)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
if options.OTLP.Enabled {
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
}
c.otlpReceiver = otlpReceiver
}
c.otlpReceiver = otlpReceiver

c.publishOpts(options)

Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ func optionsForEphemeralPorts() *flags.CollectorOptions {
collectorOpts := &flags.CollectorOptions{}
collectorOpts.GRPC.HostPort = ":0"
collectorOpts.HTTP.HostPort = ":0"
collectorOpts.OTLP.Enabled = true
collectorOpts.OTLP.GRPC.HostPort = ":0"
collectorOpts.OTLP.HTTP.HostPort = ":0"
collectorOpts.Zipkin.HTTPHostPort = ":0"
return collectorOpts
}

Expand All @@ -60,8 +62,10 @@ func TestNewCollector(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})

collectorOpts := optionsForEphemeralPorts()
require.NoError(t, c.Start(collectorOpts))
assert.NotNil(t, c.SpanHandlers())
assert.NoError(t, c.Close())
}

Expand Down
10 changes: 7 additions & 3 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
flagSuffixGRPCMaxConnectionAge = "max-connection-age"
flagSuffixGRPCMaxConnectionAgeGrace = "max-connection-age-grace"

flagCollectorOTLPEnabled = "collector.otlp.enabled"

flagZipkinHTTPHostPort = "collector.zipkin.host-port"
flagZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
flagZipkinAllowedOrigins = "collector.zipkin.allowed-origins"
Expand Down Expand Up @@ -104,8 +106,9 @@ type CollectorOptions struct {
GRPC GRPCOptions
// OTLP section defines options for servers accepting OpenTelemetry OTLP format
OTLP struct {
GRPC GRPCOptions
HTTP HTTPOptions
Enabled bool
GRPC GRPCOptions
HTTP HTTPOptions
}
// Zipkin section defines options for Zipkin HTTP server
Zipkin struct {
Expand Down Expand Up @@ -161,6 +164,7 @@ func AddFlags(flags *flag.FlagSet) {
addHTTPFlags(flags, httpServerFlagsCfg, ports.PortToHostPort(ports.CollectorHTTP))
addGRPCFlags(flags, grpcServerFlagsCfg, ports.PortToHostPort(ports.CollectorGRPC))

flags.Bool(flagCollectorOTLPEnabled, false, "Enables OpenTelemetry OTLP receiver on dedicated HTTP and gRPC ports")
addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, "")
addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, "")

Expand Down Expand Up @@ -234,10 +238,10 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger)
return cOpts, fmt.Errorf("failed to parse gRPC server options: %w", err)
}

cOpts.OTLP.Enabled = v.GetBool(flagCollectorOTLPEnabled)
if err := cOpts.OTLP.HTTP.initFromViper(v, logger, otlpServerFlagsCfg.HTTP); err != nil {
return cOpts, fmt.Errorf("failed to parse OTLP/HTTP server options: %w", err)
}

if err := cOpts.OTLP.GRPC.initFromViper(v, logger, otlpServerFlagsCfg.GRPC); err != nil {
return cOpts, fmt.Errorf("failed to parse OTLP/gRPC server options: %w", err)
}
Expand Down
59 changes: 22 additions & 37 deletions cmd/collector/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,43 +57,28 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) {
assert.Equal(t, "0.0.0.0:3456", c.Zipkin.HTTPHostPort)
}

func TestCollectorOptionsWithFailedHTTPFlags(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
"--collector.http.tls.enabled=false",
"--collector.http.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = c.InitFromViper(v, zap.NewNop())
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse HTTP TLS options")
}

func TestCollectorOptionsWithFailedGRPCFlags(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
"--collector.grpc.tls.enabled=false",
"--collector.grpc.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = c.InitFromViper(v, zap.NewNop())
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse gRPC TLS options")
}

func TestCollectorOptionsWithFailedZipkinFlags(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
"--collector.zipkin.tls.enabled=false",
"--collector.zipkin.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = c.InitFromViper(v, zap.NewNop())
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse Zipkin TLS options")
func TestCollectorOptionsWithFailedTLSFlags(t *testing.T) {
prefixes := []string{
"--collector.http",
"--collector.grpc",
"--collector.zipkin",
"--collector.otlp.http",
"--collector.otlp.grpc",
}
for _, prefix := range prefixes {
t.Run(prefix, func(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
prefix + ".tls.enabled=false",
prefix + ".tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = c.InitFromViper(v, zap.NewNop())
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse")
})
}
}

func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) {
Expand Down
40 changes: 4 additions & 36 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package handler
import (
"context"
"fmt"
"time"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
Expand All @@ -31,45 +30,14 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
)

var _ component.Host = (*otelHost)(nil) // API check

// OtelReceiverOptions allows configuration of the receiver.
type OtelReceiverOptions struct {
GRPC OtelReceiverGRPCOptions
HTTP OtelReceiverHTTPOptions
}

// OtelReceiverGRPCOptions allows configuration of the GRPC receiver.
type OtelReceiverGRPCOptions struct {
// HostPort is the host:port address that the server listens on
HostPort string
// TLS configures secure transport for HTTP endpoint
TLS tlscfg.Options
// MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector.
MaxReceiveMessageLength int
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist.
// See gRPC's keepalive.ServerParameters#MaxConnectionAge.
MaxConnectionAge time.Duration
// MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
}

// OtelReceiverHTTPOptions defines options for an HTTP server
type OtelReceiverHTTPOptions struct {
// HostPort is the host:port address that the server listens on
HostPort string
// TLS configures secure transport for HTTP endpoint
TLS tlscfg.Options
}

// StartOtelReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports.
func StartOtelReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) {
// StartOTLPReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports.
func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) {
otlpFactory := otlpreceiver.NewFactory()
return startOtelReceiver(
return startOTLPReceiver(
options,
logger,
spanProcessor,
Expand All @@ -82,7 +50,7 @@ func StartOtelReceiver(options *flags.CollectorOptions, logger *zap.Logger, span
// Some of OTELCOL constructor functions return errors when passed nil arguments,
// which is a situation we cannot reproduce. To test our own error handling, this
// function allows to mock those constructors.
func startOtelReceiver(
func startOTLPReceiver(
options *flags.CollectorOptions,
logger *zap.Logger,
spanProcessor processor.SpanProcessor,
Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/handler/otlp_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func optionsWithPorts(port string) *flags.CollectorOptions {
func TestStartOtlpReceiver(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
rec, err := StartOtelReceiver(optionsWithPorts(":0"), logger, spanProcessor)
rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor)
require.NoError(t, err)
defer func() {
assert.NoError(t, rec.Shutdown(context.Background()))
Expand Down Expand Up @@ -96,23 +96,23 @@ func TestStartOtlpReceiver_Error(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
opts := optionsWithPorts(":-1")
_, err := StartOtelReceiver(opts, logger, spanProcessor)
_, err := StartOTLPReceiver(opts, logger, spanProcessor)
require.Error(t, err)
assert.Contains(t, err.Error(), "could not start the OTLP receiver")

newTraces := func(_ consumer.ConsumeTracesFunc, _ ...consumer.Option) (consumer.Traces, error) {
return nil, errors.New("mock error")
}
f := otlpreceiver.NewFactory()
_, err = startOtelReceiver(opts, logger, spanProcessor, f, newTraces, f.CreateTracesReceiver)
_, err = startOTLPReceiver(opts, logger, spanProcessor, f, newTraces, f.CreateTracesReceiver)
require.Error(t, err)
assert.Contains(t, err.Error(), "could not create the OTLP consumer")

createTracesReceiver := func(_ context.Context, _ component.ReceiverCreateSettings,
_ config.Receiver, _ consumer.Traces) (component.TracesReceiver, error) {
return nil, errors.New("mock error")
}
_, err = startOtelReceiver(opts, logger, spanProcessor, f, consumer.NewTraces, createTracesReceiver)
_, err = startOTLPReceiver(opts, logger, spanProcessor, f, consumer.NewTraces, createTracesReceiver)
require.Error(t, err)
assert.Contains(t, err.Error(), "could not create the OTLP receiver")
}
Expand Down

0 comments on commit 9e18c86

Please sign in to comment.