diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index ee2fee33cc86..c3c3c68d49c2 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -109,7 +109,7 @@ func (c *Collector) Start(options *CollectorOptions) error { MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace, }) if err != nil { - return fmt.Errorf("could not start gRPC collector: %w", err) + return fmt.Errorf("could not start gRPC server: %w", err) } c.grpcServer = grpcServer @@ -123,7 +123,7 @@ func (c *Collector) Start(options *CollectorOptions) error { Logger: c.logger, }) if err != nil { - return fmt.Errorf("could not start the HTTP server: %w", err) + return fmt.Errorf("could not start HTTP server: %w", err) } c.hServer = httpServer @@ -141,7 +141,7 @@ func (c *Collector) Start(options *CollectorOptions) error { MetricsFactory: c.metricsFactory, }) if err != nil { - return fmt.Errorf("could not start the Zipkin server: %w", err) + return fmt.Errorf("could not start Zipkin server: %w", err) } c.zkServer = zkServer @@ -154,7 +154,7 @@ func (c *Collector) Start(options *CollectorOptions) error { c.spanProcessor, ) if err != nil { - return err + return fmt.Errorf("could not start OTLP server: %w", err) } c.otlpReceiver = otlpReceiver @@ -171,12 +171,12 @@ func (c *Collector) publishOpts(cOpts *CollectorOptions) { // Close the component and all its underlying dependencies func (c *Collector) Close() error { - // gRPC server + // Stop gRPC server if c.grpcServer != nil { c.grpcServer.GracefulStop() } - // HTTP server + // Stop HTTP server if c.hServer != nil { timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) if err := c.hServer.Shutdown(timeout); err != nil { @@ -185,7 +185,7 @@ func (c *Collector) Close() error { defer cancel() } - // Zipkin server + // Stop Zipkin server if c.zkServer != nil { timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) if err := c.zkServer.Shutdown(timeout); err != nil { @@ -194,7 +194,7 @@ func (c *Collector) Close() error { defer cancel() } - // OpenTelemetry OTLP receiver + // Stop OpenTelemetry OTLP receiver if c.otlpReceiver != nil { timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) if err := c.otlpReceiver.Shutdown(timeout); err != nil { diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index d5e369e172b3..897912d617b5 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics/fork" "github.com/uber/jaeger-lib/metrics/metricstest" "go.uber.org/zap" @@ -33,6 +34,15 @@ import ( var _ (io.Closer) = (*Collector)(nil) +func optionsForEphemeralPorts() *CollectorOptions { + collectorOpts := &CollectorOptions{} + collectorOpts.GRPC.HostPort = ":0" + collectorOpts.HTTP.HostPort = ":0" + collectorOpts.OTLP.GRPCHostPort = ":0" + collectorOpts.OTLP.HTTPHostPort = ":0" + return collectorOpts +} + func TestNewCollector(t *testing.T) { // prepare hc := healthcheck.New() @@ -49,13 +59,51 @@ func TestNewCollector(t *testing.T) { StrategyStore: strategyStore, HealthCheck: hc, }) - collectorOpts := &CollectorOptions{} + collectorOpts := optionsForEphemeralPorts() + require.NoError(t, c.Start(collectorOpts)) + assert.NoError(t, c.Close()) +} - // test - c.Start(collectorOpts) +func TestCollector_StartErrors(t *testing.T) { + run := func(name string, options *CollectorOptions, expErr string) { + t.Run(name, func(t *testing.T) { + hc := healthcheck.New() + logger := zap.NewNop() + baseMetrics := metricstest.NewFactory(time.Hour) + spanWriter := &fakeSpanWriter{} + strategyStore := &mockStrategyStore{} + + c := New(&CollectorParams{ + ServiceName: "collector", + Logger: logger, + MetricsFactory: baseMetrics, + SpanWriter: spanWriter, + StrategyStore: strategyStore, + HealthCheck: hc, + }) + err := c.Start(options) + require.Error(t, err) + assert.Contains(t, err.Error(), expErr) + }) + } - // verify - assert.NoError(t, c.Close()) + var options *CollectorOptions + + options = optionsForEphemeralPorts() + options.GRPC.HostPort = ":-1" + run("gRPC", options, "could not start gRPC server") + + options = optionsForEphemeralPorts() + options.HTTP.HostPort = ":-1" + run("HTTP", options, "could not start HTTP server") + + options = optionsForEphemeralPorts() + options.Zipkin.HTTPHostPort = ":-1" + run("Zipkin", options, "could not start Zipkin server") + + options = optionsForEphemeralPorts() + options.OTLP.HTTPHostPort = ":-1" + run("OTLP", options, "could not start OTLP server") } type mockStrategyStore struct { @@ -83,12 +131,11 @@ func TestCollector_PublishOpts(t *testing.T) { StrategyStore: strategyStore, HealthCheck: hc, }) - collectorOpts := &CollectorOptions{ - NumWorkers: 24, - QueueSize: 42, - } + collectorOpts := optionsForEphemeralPorts() + collectorOpts.NumWorkers = 24 + collectorOpts.QueueSize = 42 - c.Start(collectorOpts) + require.NoError(t, c.Start(collectorOpts)) defer c.Close() forkFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{ @@ -119,16 +166,13 @@ func TestAggregator(t *testing.T) { HealthCheck: hc, Aggregator: agg, }) - collectorOpts := &CollectorOptions{ - QueueSize: 10, - NumWorkers: 10, - } - - // test - c.Start(collectorOpts) + collectorOpts := optionsForEphemeralPorts() + collectorOpts.NumWorkers = 10 + collectorOpts.QueueSize = 10 + require.NoError(t, c.Start(collectorOpts)) // assert that aggregator was added to the collector - _, err := c.spanProcessor.ProcessSpans([]*model.Span{ + spans := []*model.Span{ { OperationName: "y", Process: &model.Process{ @@ -145,15 +189,18 @@ func TestAggregator(t *testing.T) { }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + } + _, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) assert.NoError(t, err) - - // verify assert.NoError(t, c.Close()) - // assert that aggregator was used - assert.Equal(t, 1, agg.callCount) - - // assert that aggregator close was called - assert.Equal(t, 1, agg.closeCount) + // spans are processed by background workers, so we may need to wait + for i := 0; i < 1000; i++ { + if agg.callCount.Load() == 1 && agg.closeCount.Load() == 1 { + break + } + time.Sleep(10 * time.Millisecond) + } + assert.EqualValues(t, 1, agg.callCount.Load(), "aggregator was used") + assert.EqualValues(t, 1, agg.closeCount.Load(), "aggregator close was called") } diff --git a/cmd/collector/app/root_span_handler_test.go b/cmd/collector/app/root_span_handler_test.go index 9f469d5580a1..c73cfce9a4c2 100644 --- a/cmd/collector/app/root_span_handler_test.go +++ b/cmd/collector/app/root_span_handler_test.go @@ -18,22 +18,23 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" ) type mockAggregator struct { - callCount int - closeCount int + callCount atomic.Int32 + closeCount atomic.Int32 } func (t *mockAggregator) RecordThroughput(service, operation, samplerType string, probability float64) { - t.callCount++ + t.callCount.Inc() } func (t *mockAggregator) Start() {} func (t *mockAggregator) Close() error { - t.closeCount++ + t.closeCount.Inc() return nil } @@ -44,7 +45,7 @@ func TestHandleRootSpan(t *testing.T) { // Testing non-root span span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} processor(span, "") - assert.Equal(t, 0, aggregator.callCount) + assert.EqualValues(t, 0, aggregator.callCount.Load()) // Testing span with service name but no operation span.References = []model.SpanRef{} @@ -52,12 +53,12 @@ func TestHandleRootSpan(t *testing.T) { ServiceName: "service", } processor(span, "") - assert.Equal(t, 0, aggregator.callCount) + assert.EqualValues(t, 0, aggregator.callCount.Load()) // Testing span with service name and operation but no probabilistic sampling tags span.OperationName = "GET" processor(span, "") - assert.Equal(t, 0, aggregator.callCount) + assert.EqualValues(t, 0, aggregator.callCount.Load()) // Testing span with service name, operation, and probabilistic sampling tags span.Tags = model.KeyValues{ @@ -65,5 +66,5 @@ func TestHandleRootSpan(t *testing.T) { model.String("sampler.param", "0.001"), } processor(span, "") - assert.Equal(t, 1, aggregator.callCount) + assert.EqualValues(t, 1, aggregator.callCount.Load()) }